0001-general-add-export-import-support-15527.patch
chrono/agendas/models.py | ||
---|---|---|
23 | 23 |
from django.utils.dates import WEEKDAYS |
24 | 24 |
from django.utils.formats import date_format |
25 | 25 |
from django.utils.text import slugify |
26 |
from django.utils.timezone import localtime, now |
|
26 |
from django.utils.timezone import localtime, now, make_aware
|
|
27 | 27 |
from django.utils.translation import ugettext_lazy as _ |
28 | 28 | |
29 | 29 |
from jsonfield import JSONField |
... | ... | |
79 | 79 |
group_ids = [x.id for x in user.groups.all()] |
80 | 80 |
return bool(self.view_role_id in group_ids) |
81 | 81 | |
82 |
def export_json(self): |
|
83 |
agenda = { |
|
84 |
'label': self.label, |
|
85 |
'slug': self.slug, |
|
86 |
'kind': self.kind, |
|
87 |
'minimal_booking_delay': self.minimal_booking_delay, |
|
88 |
'maximal_booking_delay': self.maximal_booking_delay, |
|
89 |
'permissions': { |
|
90 |
'view': self.view_role.name if self.view_role else None, |
|
91 |
'edit': self.edit_role.name if self.edit_role else None, |
|
92 |
} |
|
93 |
} |
|
94 |
if self.kind == 'events': |
|
95 |
agenda['events'] = [x.export_json() for x in self.event_set.all()] |
|
96 |
elif self.kind == 'meetings': |
|
97 |
agenda['meetingtypes'] = [x.export_json() for x in self.meetingtype_set.all()] |
|
98 |
agenda['timeperiods'] = [x.export_json() for x in self.timeperiod_set.all()] |
|
99 |
return agenda |
|
100 | ||
101 |
@classmethod |
|
102 |
def import_json(self, data, overwrite=False): |
|
103 |
data = data.copy() |
|
104 |
permissions = data.pop('permissions') |
|
105 |
if data['kind'] == 'events': |
|
106 |
events = data.pop('events') |
|
107 |
elif data['kind'] == 'meetings': |
|
108 |
meetingtypes = data.pop('meetingtypes') |
|
109 |
timeperiods = data.pop('timeperiods') |
|
110 |
agenda, created = self.objects.get_or_create(slug=data['slug'], defaults=data) |
|
111 |
if data['kind'] == 'events': |
|
112 |
if overwrite: |
|
113 |
Event.objects.filter(agenda=agenda).delete() |
|
114 |
for event_data in events: |
|
115 |
event_data['agenda'] = agenda |
|
116 |
Event.import_json(event_data).save() |
|
117 |
elif data['kind'] == 'meetings': |
|
118 |
if overwrite: |
|
119 |
MeetingType.objects.filter(agenda=agenda).delete() |
|
120 |
TimePeriod.objects.filter(agenda=agenda).delete() |
|
121 |
for type_data in meetingtypes: |
|
122 |
type_data['agenda'] = agenda |
|
123 |
MeetingType.import_json(type_data).save() |
|
124 |
for period_data in timeperiods: |
|
125 |
period_data['agenda'] = agenda |
|
126 |
TimePeriod.import_json(period_data).save() |
|
82 | 127 | |
83 | 128 |
WEEKDAYS_LIST = sorted(WEEKDAYS.items(), key=lambda x: x[0]) |
84 | 129 | |
... | ... | |
114 | 159 |
def weekday_str(self): |
115 | 160 |
return WEEKDAYS[self.weekday] |
116 | 161 | |
162 |
@classmethod |
|
163 |
def import_json(cls, data): |
|
164 |
return cls(**data) |
|
165 | ||
166 |
def export_json(self): |
|
167 |
return { |
|
168 |
'weekday': self.weekday, |
|
169 |
'start_time': self.start_time.strftime('%H:%M'), |
|
170 |
'end_time': self.end_time.strftime('%H:%M'), |
|
171 |
} |
|
172 | ||
117 | 173 |
def get_time_slots(self, min_datetime, max_datetime, meeting_type): |
118 | 174 |
duration = datetime.timedelta(minutes=meeting_type.duration) |
119 | 175 | |
... | ... | |
164 | 220 |
self.slug = slug |
165 | 221 |
super(MeetingType, self).save(*args, **kwargs) |
166 | 222 | |
223 |
@classmethod |
|
224 |
def import_json(cls, data): |
|
225 |
return cls(**data) |
|
226 | ||
227 |
def export_json(self): |
|
228 |
return { |
|
229 |
'label': self.label, |
|
230 |
'slug': self.slug, |
|
231 |
'duration': self.duration, |
|
232 |
} |
|
233 | ||
167 | 234 | |
168 | 235 |
class Event(models.Model): |
169 | 236 |
agenda = models.ForeignKey(Agenda) |
... | ... | |
222 | 289 |
def get_absolute_url(self): |
223 | 290 |
return reverse('chrono-manager-event-edit', kwargs={'pk': self.id}) |
224 | 291 | |
292 |
@classmethod |
|
293 |
def import_json(cls, data): |
|
294 |
data['start_datetime'] = make_aware(datetime.datetime.strptime( |
|
295 |
data['start_datetime'], '%Y-%m-%d %H:%M:%S')) |
|
296 |
return cls(**data) |
|
297 | ||
298 |
def export_json(self): |
|
299 |
return { |
|
300 |
'start_datetime': self.start_datetime.strftime('%Y-%m-%d %H:%M:%S'), |
|
301 |
'places': self.places, |
|
302 |
'waiting_list_places': self.waiting_list_places, |
|
303 |
'label': self.label |
|
304 |
} |
|
305 | ||
225 | 306 | |
226 | 307 |
class Booking(models.Model): |
227 | 308 |
event = models.ForeignKey(Event) |
chrono/manager/management/commands/export_site.py | ||
---|---|---|
1 |
# chrono - agendas system |
|
2 |
# Copyright (C) 2016-2017 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import json |
|
18 |
import sys |
|
19 |
from optparse import make_option |
|
20 | ||
21 |
from django.core.management.base import BaseCommand |
|
22 | ||
23 |
from chrono.manager.utils import export_site |
|
24 | ||
25 | ||
26 |
class Command(BaseCommand): |
|
27 |
args = '' |
|
28 |
help = 'Export the site' |
|
29 |
option_list = BaseCommand.option_list + ( |
|
30 |
make_option('--output', metavar='FILE', default=None, |
|
31 |
help='name of a file to write output to'), |
|
32 |
) |
|
33 | ||
34 |
def handle(self, *args, **options): |
|
35 |
if options['output']: |
|
36 |
output = open(options['output'], 'w') |
|
37 |
else: |
|
38 |
output = sys.stdout |
|
39 |
json.dump(export_site(), output, indent=4) |
chrono/manager/management/commands/import_site.py | ||
---|---|---|
1 |
# chrono - agendas system |
|
2 |
# Copyright (C) 2016-2017 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import json |
|
18 |
from optparse import make_option |
|
19 | ||
20 |
from django.core.management.base import BaseCommand |
|
21 | ||
22 |
from chrono.manager.utils import import_site |
|
23 | ||
24 | ||
25 |
class Command(BaseCommand): |
|
26 |
args = '<filename>' |
|
27 |
help = 'Import an exported site' |
|
28 |
option_list = BaseCommand.option_list + ( |
|
29 |
make_option('--clean', action='store_true', default=False, |
|
30 |
help='Clean site before importing'), |
|
31 |
make_option('--if-empty', action='store_true', default=False, |
|
32 |
help='Import only if site is empty'), |
|
33 |
make_option('--overwrite', action='store_true', default=False, |
|
34 |
help='Overwrite existing data'), |
|
35 |
) |
|
36 | ||
37 |
def handle(self, filename, **options): |
|
38 |
import_site(json.load(open(filename)), |
|
39 |
if_empty=options['if_empty'], |
|
40 |
clean=options['clean'], |
|
41 |
overwrite=options['overwrite']) |
chrono/manager/utils.py | ||
---|---|---|
1 |
# chrono - agendas system |
|
2 |
# Copyright (C) 2016-2017 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from django.db import transaction |
|
18 | ||
19 |
from chrono.agendas.models import Agenda |
|
20 | ||
21 | ||
22 |
def export_site(): |
|
23 |
'''Dump site objects to JSON-dumpable dictionnary''' |
|
24 |
d = {} |
|
25 |
d['agendas'] = [x.export_json() for x in Agenda.objects.all()] |
|
26 |
return d |
|
27 | ||
28 | ||
29 |
def import_site(data, if_empty=False, clean=False, overwrite=False): |
|
30 |
if if_empty and Agenda.objects.count(): |
|
31 |
return |
|
32 | ||
33 |
if clean: |
|
34 |
Agenda.objects.all().delete() |
|
35 | ||
36 |
with transaction.atomic(): |
|
37 |
for data in data.get('agendas', []): |
|
38 |
Agenda.import_json(data, overwrite=overwrite) |
tests/test_api.py | ||
---|---|---|
47 | 47 | |
48 | 48 |
@pytest.fixture |
49 | 49 |
def meetings_agenda(): |
50 |
agenda = Agenda(label=u'Foo bar', kind='meetings', |
|
50 |
agenda = Agenda(label=u'Foo bar Meeting', kind='meetings',
|
|
51 | 51 |
minimal_booking_delay=1, maximal_booking_delay=56) |
52 | 52 |
agenda.save() |
53 | 53 |
meeting_type = MeetingType(agenda=agenda, label='Blah', duration=30) |
... | ... | |
304 | 304 |
assert resp.json['err'] == 1 |
305 | 305 | |
306 | 306 |
def test_booking_cancellation_post_meeting_api(app, meetings_agenda, user): |
307 |
agenda_id = Agenda.objects.filter(label=u'Foo bar')[0].id |
|
307 |
agenda_id = Agenda.objects.filter(label=u'Foo bar Meeting')[0].id
|
|
308 | 308 |
meeting_type = MeetingType.objects.get(agenda=meetings_agenda) |
309 | 309 |
resp = app.get('/api/agenda/meetings/%s/datetimes/' % meeting_type.id) |
310 | 310 |
nb_events = len(resp.json['data']) |
tests/test_import_export.py | ||
---|---|---|
1 |
from cStringIO import StringIO |
|
2 |
import datetime |
|
3 |
import json |
|
4 |
import os |
|
5 |
import shutil |
|
6 |
import sys |
|
7 |
import tempfile |
|
8 | ||
9 |
import pytest |
|
10 |
from django.core.management import call_command |
|
11 | ||
12 |
from chrono.agendas.models import Agenda, Event, MeetingType, TimePeriod |
|
13 |
from chrono.manager.utils import export_site, import_site |
|
14 | ||
15 |
from test_api import some_data, meetings_agenda |
|
16 | ||
17 |
pytestmark = pytest.mark.django_db |
|
18 | ||
19 |
def get_output_of_command(command, *args, **kwargs): |
|
20 |
old_stdout = sys.stdout |
|
21 |
output = sys.stdout = StringIO() |
|
22 |
call_command(command, *args, **kwargs) |
|
23 |
sys.stdout = old_stdout |
|
24 |
return output.getvalue() |
|
25 | ||
26 |
def test_import_export(app, some_data, meetings_agenda): |
|
27 |
output = get_output_of_command('export_site') |
|
28 |
assert len(json.loads(output)['agendas']) == 3 |
|
29 |
import_site(data={}, clean=True) |
|
30 |
empty_output = get_output_of_command('export_site') |
|
31 |
assert len(json.loads(empty_output)['agendas']) == 0 |
|
32 | ||
33 |
with tempfile.NamedTemporaryFile() as f: |
|
34 |
f.write(output) |
|
35 |
f.flush() |
|
36 |
call_command('import_site', f.name) |
|
37 | ||
38 |
assert Agenda.objects.count() == 3 |
|
39 | ||
40 |
agenda1 = Agenda.objects.get(label=u'Foo bar') |
|
41 |
agenda2 = Agenda.objects.get(label=u'Foo bar Meeting') |
|
42 |
event = Event(agenda=agenda1, start_datetime= datetime.datetime.now(), places=10) |
|
43 |
event.save() |
|
44 |
timeperiod = TimePeriod(agenda=agenda2, weekday=2, |
|
45 |
start_time=datetime.time(10, 0), end_time=datetime.time(11, 0)) |
|
46 |
timeperiod.save() |
|
47 | ||
48 |
import_site(json.loads(output), overwrite=True) |
|
49 |
assert Event.objects.filter(id=event.id).count() == 0 |
|
50 |
assert TimePeriod.objects.filter(id=timeperiod.id).count() == 0 |
|
51 | ||
52 |
event = Event(agenda=agenda1, start_datetime= datetime.datetime.now(), places=10) |
|
53 |
event.save() |
|
54 |
timeperiod = TimePeriod(agenda=agenda2, weekday=2, |
|
55 |
start_time=datetime.time(10, 0), end_time=datetime.time(11, 0)) |
|
56 |
timeperiod.save() |
|
57 |
import_site(json.loads(output), overwrite=False) |
|
58 |
assert Event.objects.filter(id=event.id).count() == 1 |
|
59 |
assert TimePeriod.objects.filter(id=timeperiod.id).count() == 1 |
|
60 | ||
61 |
import_site(data={}, if_empty=True) |
|
62 |
assert Agenda.objects.count() == 3 |
|
63 | ||
64 |
import_site(data={}, clean=True) |
|
65 |
tempdir = tempfile.mkdtemp('chrono-test') |
|
66 |
empty_output = get_output_of_command('export_site', output=os.path.join(tempdir, 't.json')) |
|
67 |
assert os.path.exists(os.path.join(tempdir, 't.json')) |
|
68 |
shutil.rmtree(tempdir) |
|
0 |
- |