0001-agendas-handle-import-export-of-unavailability-calen.patch
chrono/agendas/models.py | ||
---|---|---|
1149 | 1149 |
def import_json(cls, data): |
1150 | 1150 |
timeperiods = data.pop('timeperiods', []) |
1151 | 1151 |
exceptions = data.pop('exceptions', []) |
1152 |
unavailability_calendars = data.pop('unavailability_calendars', []) |
|
1152 | 1153 |
data = clean_import_data(cls, data) |
1153 | 1154 |
desk, created = cls.objects.update_or_create(slug=data['slug'], agenda=data['agenda'], defaults=data) |
1154 | 1155 |
for timeperiod in timeperiods: |
... | ... | |
1157 | 1158 |
for exception in exceptions: |
1158 | 1159 |
exception['desk'] = desk |
1159 | 1160 |
TimePeriodException.import_json(exception) |
1161 |
for unavailability_calendar in unavailability_calendars: |
|
1162 |
try: |
|
1163 |
slug = unavailability_calendar['slug'] |
|
1164 |
target_calendar = UnavailabilityCalendar.objects.get(slug=slug) |
|
1165 |
desk.unavailability_calendars.add(target_calendar) |
|
1166 |
except UnavailabilityCalendar.DoesNotExist: |
|
1167 |
raise AgendaImportError(_('The unavailability calendar "%s" does not exist.') % slug) |
|
1168 | ||
1160 | 1169 |
return desk |
1161 | 1170 | |
1162 | 1171 |
def export_json(self): |
... | ... | |
1165 | 1174 |
'slug': self.slug, |
1166 | 1175 |
'timeperiods': [time_period.export_json() for time_period in self.timeperiod_set.all()], |
1167 | 1176 |
'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()], |
1177 |
'unavailability_calendars': [{'slug': x.slug} for x in self.unavailability_calendars.all()], |
|
1168 | 1178 |
} |
1169 | 1179 | |
1170 | 1180 |
def duplicate(self, label=None, agenda_target=None): |
... | ... | |
1543 | 1553 |
def get_absolute_url(self): |
1544 | 1554 |
return reverse('chrono-manager-unavailability-calendar-view', kwargs={'pk': self.id}) |
1545 | 1555 | |
1556 |
def export_json(self): |
|
1557 |
unavailability_calendar = { |
|
1558 |
'label': self.label, |
|
1559 |
'slug': self.slug, |
|
1560 |
'permissions': { |
|
1561 |
'view': self.view_role.name if self.view_role else None, |
|
1562 |
'edit': self.edit_role.name if self.edit_role else None, |
|
1563 |
}, |
|
1564 |
'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()], |
|
1565 |
} |
|
1566 |
return unavailability_calendar |
|
1567 | ||
1568 |
@classmethod |
|
1569 |
def import_json(cls, data, overwrite=False): |
|
1570 |
data = data.copy() |
|
1571 |
permissions = data.pop('permissions', {}) |
|
1572 |
exceptions = data.pop('exceptions', []) |
|
1573 |
for permission in ('view', 'edit'): |
|
1574 |
if permissions.get(permission): |
|
1575 |
data[permission + '_role'] = Group.objects.get(name=permissions[permission]) |
|
1576 |
data = clean_import_data(cls, data) |
|
1577 |
unavailability_calendar, created = cls.objects.get_or_create(slug=data['slug'], defaults=data) |
|
1578 |
if not created: |
|
1579 |
for k, v in data.items(): |
|
1580 |
setattr(unavailability_calendar, k, v) |
|
1581 |
if overwrite: |
|
1582 |
TimePeriodException.objects.filter(unavailability_calendar=unavailability_calendar).delete() |
|
1583 |
for exception in exceptions: |
|
1584 |
exception['unavailability_calendar'] = unavailability_calendar |
|
1585 |
TimePeriodException.import_json(exception) |
|
1586 | ||
1587 |
return created |
|
1588 | ||
1546 | 1589 | |
1547 | 1590 |
class TimePeriodException(models.Model): |
1548 | 1591 |
desk = models.ForeignKey(Desk, on_delete=models.CASCADE, null=True) |
chrono/manager/utils.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import collections |
|
17 | 18 |
import itertools |
18 | 19 | |
19 | 20 |
from django.contrib.auth.models import Group |
20 | 21 |
from django.db import transaction |
21 | 22 |
from django.db.models import Q |
22 | 23 | |
23 |
from chrono.agendas.models import Agenda, AgendaImportError |
|
24 |
from chrono.agendas.models import Agenda, AgendaImportError, UnavailabilityCalendar
|
|
24 | 25 | |
25 | 26 | |
26 | 27 |
def export_site(): |
27 | 28 |
'''Dump site objects to JSON-dumpable dictionnary''' |
28 |
data = {} |
|
29 |
data = collections.OrderedDict() |
|
30 |
data['unavailability_calendars'] = [x.export_json() for x in UnavailabilityCalendar.objects.all()] |
|
29 | 31 |
qs1 = Agenda.objects.filter(~Q(kind='virtual')) |
30 | 32 |
qs2 = Agenda.objects.filter(kind='virtual') |
31 | 33 |
data['agendas'] = [x.export_json() for x in itertools.chain(qs1, qs2)] |
... | ... | |
33 | 35 | |
34 | 36 | |
35 | 37 |
def import_site(data, if_empty=False, clean=False, overwrite=False): |
36 |
if if_empty and Agenda.objects.count():
|
|
38 |
if if_empty and (Agenda.objects.count() or UnavailabilityCalendar.objects.count()):
|
|
37 | 39 |
return |
38 | 40 | |
39 | 41 |
if clean: |
40 | 42 |
Agenda.objects.all().delete() |
43 |
UnavailabilityCalendar.objects.all().count() |
|
41 | 44 | |
42 | 45 |
results = {'created': 0, 'updated': 0} |
43 | 46 |
agendas = data.get('agendas', []) |
47 |
unavailability_calendars = data.get('unavailability_calendars', []) |
|
44 | 48 | |
45 |
role_names = {name for data in agendas for _, name in data.get('permissions', {}).items() if name} |
|
49 |
role_names = set() |
|
50 |
for objs in (agendas, unavailability_calendars): |
|
51 |
role_names = role_names.union( |
|
52 |
{name for data in agendas for _, name in data.get('permissions', {}).items() if name} |
|
53 |
) |
|
46 | 54 |
existing_roles = Group.objects.filter(name__in=role_names) |
47 | 55 | |
48 | 56 |
if existing_roles.count() != len(role_names): |
... | ... | |
50 | 58 |
raise AgendaImportError('Missing roles: "%s"' % ', '.join(role_names - existing_roles_names)) |
51 | 59 | |
52 | 60 |
with transaction.atomic(): |
53 |
for data in agendas: |
|
54 |
created = Agenda.import_json(data, overwrite=overwrite) |
|
55 |
if created: |
|
56 |
results['created'] += 1 |
|
57 |
else: |
|
58 |
results['updated'] += 1 |
|
61 |
for objs, cls in ((unavailability_calendars, UnavailabilityCalendar), (agendas, Agenda)): |
|
62 |
for data in objs: |
|
63 |
created = cls.import_json(data, overwrite=overwrite) |
|
64 |
if created: |
|
65 |
results['created'] += 1 |
|
66 |
else: |
|
67 |
results['updated'] += 1 |
|
59 | 68 |
return results |
tests/test_import_export.py | ||
---|---|---|
30 | 30 |
VirtualMember, |
31 | 31 |
AgendaNotificationsSettings, |
32 | 32 |
AgendaReminderSettings, |
33 |
UnavailabilityCalendar, |
|
33 | 34 |
) |
34 | 35 |
from chrono.manager.utils import import_site |
35 | 36 | |
... | ... | |
531 | 532 | |
532 | 533 |
# again - check OneToOneField |
533 | 534 |
import_site(payload) |
535 | ||
536 | ||
537 |
def test_import_export_unavailability_calendar(app): |
|
538 |
output = get_output_of_command('export_site') |
|
539 |
payload = json.loads(output) |
|
540 |
assert len(payload['unavailability_calendars']) == 0 |
|
541 | ||
542 |
group1 = Group.objects.create(name=u'gé1') |
|
543 |
group2 = Group.objects.create(name=u'gé2') |
|
544 |
calendar = UnavailabilityCalendar.objects.create(label='Calendar', view_role=group1, edit_role=group2) |
|
545 |
tp1_start = make_aware(datetime.datetime(2017, 5, 22, 8, 0)) |
|
546 |
tp1_end = make_aware(datetime.datetime(2017, 5, 22, 12, 30)) |
|
547 |
tp1 = TimePeriodException.objects.create( |
|
548 |
unavailability_calendar=calendar, start_datetime=tp1_start, end_datetime=tp1_end |
|
549 |
) |
|
550 |
tp2_start = make_aware(datetime.datetime(2018, 5, 22, 8, 0)) |
|
551 |
tp2_end = make_aware(datetime.datetime(2018, 5, 22, 12, 30)) |
|
552 |
tp2 = TimePeriodException.objects.create( |
|
553 |
unavailability_calendar=calendar, start_datetime=tp2_start, end_datetime=tp2_end |
|
554 |
) |
|
555 |
meetings_agenda = Agenda.objects.create(label='Foo Bar', kind='meetings') |
|
556 |
MeetingType.objects.create(agenda=meetings_agenda, label='Meeting Type', duration=30) |
|
557 |
desk = Desk.objects.create(agenda=meetings_agenda, label='Desk') |
|
558 |
desk.unavailability_calendars.add(calendar) |
|
559 | ||
560 |
output = get_output_of_command('export_site') |
|
561 |
payload = json.loads(output) |
|
562 |
assert len(payload['unavailability_calendars']) == 1 |
|
563 |
assert len(payload['agendas']) == 1 |
|
564 | ||
565 |
calendar.delete() |
|
566 |
tp1.delete() |
|
567 |
tp2.delete() |
|
568 |
meetings_agenda.delete() |
|
569 |
assert not UnavailabilityCalendar.objects.exists() |
|
570 |
assert not TimePeriodException.objects.exists() |
|
571 |
assert not Agenda.objects.exists() |
|
572 |
assert not Desk.objects.exists() |
|
573 | ||
574 |
import_site(payload) |
|
575 |
assert UnavailabilityCalendar.objects.count() == 1 |
|
576 |
calendar = UnavailabilityCalendar.objects.first() |
|
577 |
assert calendar.label == 'Calendar' |
|
578 |
assert calendar.view_role == group1 |
|
579 |
assert calendar.edit_role == group2 |
|
580 |
assert calendar.timeperiodexception_set.count() == 2 |
|
581 |
assert TimePeriodException.objects.get( |
|
582 |
unavailability_calendar=calendar, start_datetime=tp1_start, end_datetime=tp1_end |
|
583 |
) |
|
584 |
assert TimePeriodException.objects.get( |
|
585 |
unavailability_calendar=calendar, start_datetime=tp2_start, end_datetime=tp2_end |
|
586 |
) |
|
587 | ||
588 |
agenda = Agenda.objects.get(label='Foo Bar') |
|
589 |
desk = agenda.desk_set.first() |
|
590 |
assert desk.unavailability_calendars.count() == 1 |
|
591 |
assert desk.unavailability_calendars.first() == calendar |
|
534 |
- |