0001-agendas-handle-import-export-of-unavailability-calen.patch
chrono/agendas/models.py | ||
---|---|---|
1147 | 1147 |
timeperiods = data.pop('timeperiods', []) |
1148 | 1148 |
exceptions = data.pop('exceptions', []) |
1149 | 1149 |
sources = data.pop('exception_sources', []) |
1150 |
unavailability_calendars = data.pop('unavailability_calendars', []) |
|
1150 | 1151 |
data = clean_import_data(cls, data) |
1151 | 1152 |
desk, created = cls.objects.update_or_create(slug=data['slug'], agenda=data['agenda'], defaults=data) |
1152 | 1153 |
for timeperiod in timeperiods: |
... | ... | |
1158 | 1159 |
for source in sources: |
1159 | 1160 |
source['desk'] = desk |
1160 | 1161 |
TimePeriodExceptionSource.import_json(source) |
1162 |
for unavailability_calendar in unavailability_calendars: |
|
1163 |
try: |
|
1164 |
slug = unavailability_calendar['slug'] |
|
1165 |
target_calendar = UnavailabilityCalendar.objects.get(slug=slug) |
|
1166 |
desk.unavailability_calendars.add(target_calendar) |
|
1167 |
except UnavailabilityCalendar.DoesNotExist: |
|
1168 |
raise AgendaImportError(_('The unavailability calendar "%s" does not exist.') % slug) |
|
1169 | ||
1161 | 1170 |
return desk |
1162 | 1171 | |
1163 | 1172 |
def export_json(self): |
... | ... | |
1169 | 1178 |
'timeperiods': [time_period.export_json() for time_period in self.timeperiod_set.filter()], |
1170 | 1179 |
'exceptions': [exception.export_json() for exception in time_period_exceptions], |
1171 | 1180 |
'exception_sources': [source.export_json() for source in time_period_exception_sources], |
1181 |
'unavailability_calendars': [{'slug': x.slug} for x in self.unavailability_calendars.all()], |
|
1172 | 1182 |
} |
1173 | 1183 | |
1174 | 1184 |
def duplicate(self, label=None, agenda_target=None): |
... | ... | |
1562 | 1572 |
def get_absolute_url(self): |
1563 | 1573 |
return reverse('chrono-manager-unavailability-calendar-view', kwargs={'pk': self.id}) |
1564 | 1574 | |
1575 |
def export_json(self): |
|
1576 |
unavailability_calendar = { |
|
1577 |
'label': self.label, |
|
1578 |
'slug': self.slug, |
|
1579 |
'permissions': { |
|
1580 |
'view': self.view_role.name if self.view_role else None, |
|
1581 |
'edit': self.edit_role.name if self.edit_role else None, |
|
1582 |
}, |
|
1583 |
'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()], |
|
1584 |
} |
|
1585 |
return unavailability_calendar |
|
1586 | ||
1587 |
@classmethod |
|
1588 |
def import_json(cls, data, overwrite=False): |
|
1589 |
data = data.copy() |
|
1590 |
permissions = data.pop('permissions', {}) |
|
1591 |
exceptions = data.pop('exceptions', []) |
|
1592 |
for permission in ('view', 'edit'): |
|
1593 |
if permissions.get(permission): |
|
1594 |
data[permission + '_role'] = Group.objects.get(name=permissions[permission]) |
|
1595 |
data = clean_import_data(cls, data) |
|
1596 |
unavailability_calendar, created = cls.objects.update_or_create(slug=data['slug'], defaults=data) |
|
1597 |
if overwrite: |
|
1598 |
TimePeriodException.objects.filter(unavailability_calendar=unavailability_calendar).delete() |
|
1599 |
for exception in exceptions: |
|
1600 |
exception['unavailability_calendar'] = unavailability_calendar |
|
1601 |
TimePeriodException.import_json(exception) |
|
1602 | ||
1603 |
return created |
|
1604 | ||
1565 | 1605 | |
1566 | 1606 |
class TimePeriodException(models.Model): |
1567 | 1607 |
desk = models.ForeignKey(Desk, on_delete=models.CASCADE, null=True) |
chrono/manager/utils.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import collections |
|
17 | 18 |
import itertools |
18 | 19 | |
19 | 20 |
from django.contrib.auth.models import Group |
20 | 21 |
from django.db import transaction |
21 | 22 |
from django.db.models import Q |
22 | 23 | |
23 |
from chrono.agendas.models import Agenda, AgendaImportError |
|
24 |
from chrono.agendas.models import Agenda, AgendaImportError, UnavailabilityCalendar
|
|
24 | 25 | |
25 | 26 | |
26 | 27 |
def export_site(): |
27 | 28 |
'''Dump site objects to JSON-dumpable dictionnary''' |
28 |
data = {} |
|
29 |
data = collections.OrderedDict() |
|
30 |
data['unavailability_calendars'] = [x.export_json() for x in UnavailabilityCalendar.objects.all()] |
|
29 | 31 |
qs1 = Agenda.objects.filter(~Q(kind='virtual')) |
30 | 32 |
qs2 = Agenda.objects.filter(kind='virtual') |
31 | 33 |
data['agendas'] = [x.export_json() for x in itertools.chain(qs1, qs2)] |
... | ... | |
33 | 35 | |
34 | 36 | |
35 | 37 |
def import_site(data, if_empty=False, clean=False, overwrite=False): |
36 |
if if_empty and Agenda.objects.count():
|
|
38 |
if if_empty and (Agenda.objects.count() or UnavailabilityCalendar.objects.count()):
|
|
37 | 39 |
return |
38 | 40 | |
39 | 41 |
if clean: |
40 | 42 |
Agenda.objects.all().delete() |
43 |
UnavailabilityCalendar.objects.all().count() |
|
41 | 44 | |
42 | 45 |
results = {'created': 0, 'updated': 0} |
43 | 46 |
agendas = data.get('agendas', []) |
47 |
unavailability_calendars = data.get('unavailability_calendars', []) |
|
44 | 48 | |
45 |
role_names = {name for data in agendas for _, name in data.get('permissions', {}).items() if name} |
|
49 |
role_names = set() |
|
50 |
for objs in (agendas, unavailability_calendars): |
|
51 |
role_names = role_names.union( |
|
52 |
{name for data in agendas for _, name in data.get('permissions', {}).items() if name} |
|
53 |
) |
|
46 | 54 |
existing_roles = Group.objects.filter(name__in=role_names) |
47 | 55 | |
48 | 56 |
if existing_roles.count() != len(role_names): |
... | ... | |
50 | 58 |
raise AgendaImportError('Missing roles: "%s"' % ', '.join(role_names - existing_roles_names)) |
51 | 59 | |
52 | 60 |
with transaction.atomic(): |
53 |
for data in agendas: |
|
54 |
created = Agenda.import_json(data, overwrite=overwrite) |
|
55 |
if created: |
|
56 |
results['created'] += 1 |
|
57 |
else: |
|
58 |
results['updated'] += 1 |
|
61 |
for objs, cls in ((unavailability_calendars, UnavailabilityCalendar), (agendas, Agenda)): |
|
62 |
for data in objs: |
|
63 |
created = cls.import_json(data, overwrite=overwrite) |
|
64 |
if created: |
|
65 |
results['created'] += 1 |
|
66 |
else: |
|
67 |
results['updated'] += 1 |
|
59 | 68 |
return results |
tests/test_import_export.py | ||
---|---|---|
32 | 32 |
VirtualMember, |
33 | 33 |
AgendaNotificationsSettings, |
34 | 34 |
AgendaReminderSettings, |
35 |
UnavailabilityCalendar, |
|
35 | 36 |
) |
36 | 37 |
from chrono.manager.utils import import_site |
37 | 38 | |
... | ... | |
565 | 566 |
source = desk.timeperiodexceptionsource_set.first() |
566 | 567 |
assert not source.enabled |
567 | 568 |
assert not desk.timeperiodexception_set.exists() |
569 | ||
570 | ||
571 |
def test_import_export_unavailability_calendar(app): |
|
572 |
output = get_output_of_command('export_site') |
|
573 |
payload = json.loads(output) |
|
574 |
assert len(payload['unavailability_calendars']) == 0 |
|
575 | ||
576 |
group1 = Group.objects.create(name=u'gé1') |
|
577 |
group2 = Group.objects.create(name=u'gé2') |
|
578 |
calendar = UnavailabilityCalendar.objects.create(label='Calendar', view_role=group1, edit_role=group2) |
|
579 |
tp1_start = make_aware(datetime.datetime(2017, 5, 22, 8, 0)) |
|
580 |
tp1_end = make_aware(datetime.datetime(2017, 5, 22, 12, 30)) |
|
581 |
tp1 = TimePeriodException.objects.create( |
|
582 |
unavailability_calendar=calendar, start_datetime=tp1_start, end_datetime=tp1_end |
|
583 |
) |
|
584 |
tp2_start = make_aware(datetime.datetime(2018, 5, 22, 8, 0)) |
|
585 |
tp2_end = make_aware(datetime.datetime(2018, 5, 22, 12, 30)) |
|
586 |
tp2 = TimePeriodException.objects.create( |
|
587 |
unavailability_calendar=calendar, start_datetime=tp2_start, end_datetime=tp2_end |
|
588 |
) |
|
589 |
meetings_agenda = Agenda.objects.create(label='Foo Bar', kind='meetings') |
|
590 |
MeetingType.objects.create(agenda=meetings_agenda, label='Meeting Type', duration=30) |
|
591 |
desk = Desk.objects.create(agenda=meetings_agenda, label='Desk') |
|
592 |
desk.unavailability_calendars.add(calendar) |
|
593 | ||
594 |
output = get_output_of_command('export_site') |
|
595 |
payload = json.loads(output) |
|
596 |
assert len(payload['unavailability_calendars']) == 1 |
|
597 |
assert len(payload['agendas']) == 1 |
|
598 | ||
599 |
calendar.delete() |
|
600 |
tp1.delete() |
|
601 |
tp2.delete() |
|
602 |
meetings_agenda.delete() |
|
603 |
assert not UnavailabilityCalendar.objects.exists() |
|
604 |
assert not TimePeriodException.objects.exists() |
|
605 |
assert not Agenda.objects.exists() |
|
606 |
assert not Desk.objects.exists() |
|
607 | ||
608 |
import_site(copy.deepcopy(payload)) |
|
609 |
assert UnavailabilityCalendar.objects.count() == 1 |
|
610 |
calendar = UnavailabilityCalendar.objects.first() |
|
611 |
assert calendar.label == 'Calendar' |
|
612 |
assert calendar.view_role == group1 |
|
613 |
assert calendar.edit_role == group2 |
|
614 |
assert calendar.timeperiodexception_set.count() == 2 |
|
615 |
assert TimePeriodException.objects.get( |
|
616 |
unavailability_calendar=calendar, start_datetime=tp1_start, end_datetime=tp1_end |
|
617 |
) |
|
618 |
assert TimePeriodException.objects.get( |
|
619 |
unavailability_calendar=calendar, start_datetime=tp2_start, end_datetime=tp2_end |
|
620 |
) |
|
621 | ||
622 |
agenda = Agenda.objects.get(label='Foo Bar') |
|
623 |
desk = agenda.desk_set.first() |
|
624 |
assert desk.unavailability_calendars.count() == 1 |
|
625 |
assert desk.unavailability_calendars.first() == calendar |
|
626 | ||
627 |
# update |
|
628 |
update_payload = copy.deepcopy(payload) |
|
629 |
update_payload['unavailability_calendars'][0]['label'] = 'Calendar Updated' |
|
630 |
import_site(update_payload) |
|
631 |
calendar.refresh_from_db() |
|
632 |
assert calendar.label == 'Calendar Updated' |
|
568 |
- |