Projet

Général

Profil

0001-add-events-importation-from-csv-16428.patch

Josué Kouka, 16 juin 2017 02:43

Télécharger (7,36 ko)

Voir les différences:

Subject: [PATCH] add events importation from csv (#16428)

 .../manager/management/commands/import_events.py   | 100 +++++++++++++++++++++
 tests/data/concerto.csv                            |  15 ++++
 tests/test_import_export.py                        |  17 ++++
 3 files changed, 132 insertions(+)
 create mode 100644 chrono/manager/management/commands/import_events.py
 create mode 100644 tests/data/concerto.csv
chrono/manager/management/commands/import_events.py
1
# -*- coding: utf-8 -*-
2
# chrono - agendas system
3
# Copyright (C) 2016-2017  Entr'ouvert
4
#
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU Affero General Public License as published
7
# by the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17

  
18
import csv
19
import datetime
20
from itertools import groupby
21

  
22
from django.core.management.base import BaseCommand
23
from django.utils.text import slugify
24

  
25
from chrono.agendas.models import Agenda, Event
26

  
27

  
28
def csv_get_dialect(filename):
29
    with open(filename, 'rb') as fd:
30
        content = fd.read()
31
        content = content.decode('utf-8-sig', 'replace').encode('utf-8')
32
        dialect = csv.Sniffer().sniff(content)
33
        return dialect
34

  
35

  
36
def csv_get_dict_data(filename, fieldnames=[], skip_header=True):
37
    with open(filename, 'rb') as fd:
38
        kwargs = {'fieldnames': fieldnames}
39
        kwargs['dialect'] = csv_get_dialect(filename)
40
        reader = csv.DictReader(fd, **kwargs)
41
        if skip_header:
42
            reader.next()
43
        return list(reader)
44
    return False
45

  
46

  
47
class BaseLoader(object):
48

  
49
    def __init__(self, filename):
50
        self.filename = filename
51

  
52
    def create(self):
53
        raise NotImplemented
54

  
55

  
56
class ConcertoLoader(BaseLoader):
57
    name = 'concerto'
58
    fieldnames = [
59
        'ID_LIE', 'DAT_JOUR_TMP', 'HEU_DEBUT_TMP', 'HEU_FIN_TMP',
60
        'NB_PLACESPOSS_TMP', 'NB_PLACESOCC_TMP', 'NB_PLACESRESFUT_TMP',
61
        'NB_PLACESRES_TMP', 'NB_PLACESLIBRES_TMP', 'NB_DUREE_TMP',
62
        'LIB_NOM_LIE', 'NB_PLACESRESFUTPOND_TMP']
63

  
64
    def __init__(self, filename, **kwargs):
65
        super(ConcertoLoader, self).__init__(filename)
66

  
67
    def create(self):
68
        data = csv_get_dict_data(self.filename, fieldnames=self.fieldnames)
69
        for nursery, events in groupby(data, lambda x: x['LIB_NOM_LIE']):
70
            nursery_slug = slugify(nursery)
71
            agenda, True = Agenda.objects.get_or_create(slug=nursery_slug, label=nursery)
72
            # purge all
73
            Event.objects.filter(agenda=agenda).delete()
74
            for event in events:
75
                event_day = datetime.datetime.strptime(event['DAT_JOUR_TMP'], '%Y%m%d')
76
                event_time = datetime.datetime.strptime(event['HEU_DEBUT_TMP'], '%H%M')
77
                event_datetime = datetime.datetime.combine(event_day, event_time.time())
78
                # XXX: value such as '-' OR '-76,24'
79
                try:
80
                    event_places = int(event['NB_PLACESLIBRES_TMP'].split(',')[0])
81
                except ValueError:
82
                    event_places = 0
83
                Event.objects.create(agenda=agenda, start_datetime=event_datetime, places=event_places)
84

  
85

  
86
class Command(BaseCommand):
87
    help = "Import events from csv file"
88

  
89
    def add_arguments(self, parser):
90
        parser.add_argument('loader', type=str, help='type of file')
91
        parser.add_argument('filename', type=str, help='name of the file to import')
92

  
93
    def handle(self, filename, loader, **options):
94
        for Loader in BaseLoader.__subclasses__():
95
            if Loader.name == loader:
96
                obj = Loader(filename)
97
                obj.create()
98
                break
99
        else:
100
            self.stdout.write('Invalid loader: %s' % loader)
tests/data/concerto.csv
1
ID_LIE;DAT_JOUR_TMP;HEU_DEBUT_TMP;HEU_FIN_TMP;NB_PLACESPOSS_TMP;NB_PLACESOCC_TMP;NB_PLACESRESFUT_TMP;NB_PLACESRES_TMP;NB_PLACESLIBRES_TMP;NB_DUREE_TMP;LIB_NOM_LIE;NB_PLACESRESFUTPOND_TMP
2
16;20170314;1430;1500;45;46;2;49;-3;4;Crèche Collective du BARON;-27,675
3
16;20170314;1500;1530;45;46;2;49;-3;4;Crèche Collective du BARON;-27,675
4
16;20170314;1530;1600;45;46;2;49;-3;4;Crèche Collective du BARON;-27,675
5
16;20170313;1600;1630;45;42,75;0;44,75;2,25;4;Crèche Collective du BARON;
6
16;20170313;1630;1700;45;42,75;0;44,75;2,25;4;Crèche Collective du BARON;
7
16;20170313;1700;1730;45;20,25;0;22,25;24,75;4;Crèche Collective du BARON;
8
16;20170313;1730;1800;45;20,25;0;22,25;24,75;4;Crèche Collective du BARON;
9
17;20170313;1700;1730;72;37;0;35;35;4;Crèche Collective des BLOSSIERES;
10
17;20170313;1730;1800;72;37;0;35;35;4;Crèche Collective des BLOSSIERES;
11
17;20170314;1000;1030;72;66,25;0;80,5;5,75;4;Crèche Collective des BLOSSIERES;
12
17;20170314;1030;1100;72;66,25;0;80,5;5,75;4;Crèche Collective des BLOSSIERES;
13
17;20170314;1300;1330;72;65;0;75;7;4;Crèche Collective des BLOSSIERES;
14
17;20170314;1330;1400;72;65;0;75;7;4;Crèche Collective des BLOSSIERES;
15

  
tests/test_import_export.py
8 8

  
9 9
import pytest
10 10
from django.core.management import call_command
11
from django.utils.dateparse import parse_datetime
11 12

  
12 13
from chrono.agendas.models import Agenda, Event, MeetingType, TimePeriod
13 14
from chrono.manager.utils import export_site, import_site
......
76 77
    empty_output = get_output_of_command('export_site', output=os.path.join(tempdir, 't.json'))
77 78
    assert os.path.exists(os.path.join(tempdir, 't.json'))
78 79
    shutil.rmtree(tempdir)
80

  
81

  
82
def test_concerto_import():
83
    filepath = os.path.join(os.path.dirname(__file__), 'data', 'concerto.csv')
84
    output = get_output_of_command('import_events', 'toto', filepath)
85
    assert output == 'Invalid loader: toto\n'
86
    call_command('import_events', 'concerto', filepath)
87
    assert Agenda.objects.count() == 2
88
    bloissieres = Agenda.objects.get(slug='creche-collective-des-blossieres')
89
    assert Event.objects.filter(agenda=bloissieres).count() == 6
90
    dt = parse_datetime('2017-03-14 12:30:00+00:00')
91
    assert bloissieres.event_set.get(start_datetime=dt).places == 7
92
    baron = Agenda.objects.get(slug='creche-collective-du-baron')
93
    assert Event.objects.filter(agenda=baron).count() == 7
94
    dt = parse_datetime('2017-03-14 14:30:00+00:00')
95
    assert baron.event_set.get(start_datetime=dt).places == -3
79
-