Projet

Général

Profil

0001-agendas-handle-import-export-of-unavailability-calen.patch

Emmanuel Cazenave, 26 octobre 2020 17:58

Télécharger (9,68 ko)

Voir les différences:

Subject: [PATCH] agendas: handle import/export of unavailability calendars
 (#47394)

 chrono/agendas/models.py    | 43 +++++++++++++++++++++++++++
 chrono/manager/utils.py     | 29 ++++++++++++-------
 tests/test_import_export.py | 58 +++++++++++++++++++++++++++++++++++++
 3 files changed, 120 insertions(+), 10 deletions(-)
chrono/agendas/models.py
1149 1149
    def import_json(cls, data):
1150 1150
        timeperiods = data.pop('timeperiods', [])
1151 1151
        exceptions = data.pop('exceptions', [])
1152
        unavailability_calendars = data.pop('unavailability_calendars', [])
1152 1153
        data = clean_import_data(cls, data)
1153 1154
        desk, created = cls.objects.update_or_create(slug=data['slug'], agenda=data['agenda'], defaults=data)
1154 1155
        for timeperiod in timeperiods:
......
1157 1158
        for exception in exceptions:
1158 1159
            exception['desk'] = desk
1159 1160
            TimePeriodException.import_json(exception)
1161
        for unavailability_calendar in unavailability_calendars:
1162
            try:
1163
                slug = unavailability_calendar['slug']
1164
                target_calendar = UnavailabilityCalendar.objects.get(slug=slug)
1165
                desk.unavailability_calendars.add(target_calendar)
1166
            except UnavailabilityCalendar.DoesNotExist:
1167
                raise AgendaImportError(_('The unavailability calendar "%s" does not exist.') % slug)
1168

  
1160 1169
        return desk
1161 1170

  
1162 1171
    def export_json(self):
......
1165 1174
            'slug': self.slug,
1166 1175
            'timeperiods': [time_period.export_json() for time_period in self.timeperiod_set.all()],
1167 1176
            'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()],
1177
            'unavailability_calendars': [{'slug': x.slug} for x in self.unavailability_calendars.all()],
1168 1178
        }
1169 1179

  
1170 1180
    def duplicate(self, label=None, agenda_target=None):
......
1543 1553
    def get_absolute_url(self):
1544 1554
        return reverse('chrono-manager-unavailability-calendar-view', kwargs={'pk': self.id})
1545 1555

  
1556
    def export_json(self):
1557
        unavailability_calendar = {
1558
            'label': self.label,
1559
            'slug': self.slug,
1560
            'permissions': {
1561
                'view': self.view_role.name if self.view_role else None,
1562
                'edit': self.edit_role.name if self.edit_role else None,
1563
            },
1564
            'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()],
1565
        }
1566
        return unavailability_calendar
1567

  
1568
    @classmethod
1569
    def import_json(cls, data, overwrite=False):
1570
        data = data.copy()
1571
        permissions = data.pop('permissions', {})
1572
        exceptions = data.pop('exceptions', [])
1573
        for permission in ('view', 'edit'):
1574
            if permissions.get(permission):
1575
                data[permission + '_role'] = Group.objects.get(name=permissions[permission])
1576
        data = clean_import_data(cls, data)
1577
        unavailability_calendar, created = cls.objects.get_or_create(slug=data['slug'], defaults=data)
1578
        if not created:
1579
            for k, v in data.items():
1580
                setattr(unavailability_calendar, k, v)
1581
        if overwrite:
1582
            TimePeriodException.objects.filter(unavailability_calendar=unavailability_calendar).delete()
1583
        for exception in exceptions:
1584
            exception['unavailability_calendar'] = unavailability_calendar
1585
            TimePeriodException.import_json(exception)
1586

  
1587
        return created
1588

  
1546 1589

  
1547 1590
class TimePeriodException(models.Model):
1548 1591
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE, null=True)
chrono/manager/utils.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import collections
17 18
import itertools
18 19

  
19 20
from django.contrib.auth.models import Group
20 21
from django.db import transaction
21 22
from django.db.models import Q
22 23

  
23
from chrono.agendas.models import Agenda, AgendaImportError
24
from chrono.agendas.models import Agenda, AgendaImportError, UnavailabilityCalendar
24 25

  
25 26

  
26 27
def export_site():
27 28
    '''Dump site objects to JSON-dumpable dictionnary'''
28
    data = {}
29
    data = collections.OrderedDict()
30
    data['unavailability_calendars'] = [x.export_json() for x in UnavailabilityCalendar.objects.all()]
29 31
    qs1 = Agenda.objects.filter(~Q(kind='virtual'))
30 32
    qs2 = Agenda.objects.filter(kind='virtual')
31 33
    data['agendas'] = [x.export_json() for x in itertools.chain(qs1, qs2)]
......
33 35

  
34 36

  
35 37
def import_site(data, if_empty=False, clean=False, overwrite=False):
36
    if if_empty and Agenda.objects.count():
38
    if if_empty and (Agenda.objects.count() or UnavailabilityCalendar.objects.count()):
37 39
        return
38 40

  
39 41
    if clean:
40 42
        Agenda.objects.all().delete()
43
        UnavailabilityCalendar.objects.all().count()
41 44

  
42 45
    results = {'created': 0, 'updated': 0}
43 46
    agendas = data.get('agendas', [])
47
    unavailability_calendars = data.get('unavailability_calendars', [])
44 48

  
45
    role_names = {name for data in agendas for _, name in data.get('permissions', {}).items() if name}
49
    role_names = set()
50
    for objs in (agendas, unavailability_calendars):
51
        role_names = role_names.union(
52
            {name for data in agendas for _, name in data.get('permissions', {}).items() if name}
53
        )
46 54
    existing_roles = Group.objects.filter(name__in=role_names)
47 55

  
48 56
    if existing_roles.count() != len(role_names):
......
50 58
        raise AgendaImportError('Missing roles: "%s"' % ', '.join(role_names - existing_roles_names))
51 59

  
52 60
    with transaction.atomic():
53
        for data in agendas:
54
            created = Agenda.import_json(data, overwrite=overwrite)
55
            if created:
56
                results['created'] += 1
57
            else:
58
                results['updated'] += 1
61
        for objs, cls in ((unavailability_calendars, UnavailabilityCalendar), (agendas, Agenda)):
62
            for data in objs:
63
                created = cls.import_json(data, overwrite=overwrite)
64
                if created:
65
                    results['created'] += 1
66
                else:
67
                    results['updated'] += 1
59 68
    return results
tests/test_import_export.py
30 30
    VirtualMember,
31 31
    AgendaNotificationsSettings,
32 32
    AgendaReminderSettings,
33
    UnavailabilityCalendar,
33 34
)
34 35
from chrono.manager.utils import import_site
35 36

  
......
531 532

  
532 533
    # again - check OneToOneField
533 534
    import_site(payload)
535

  
536

  
537
def test_import_export_unavailability_calendar(app):
538
    output = get_output_of_command('export_site')
539
    payload = json.loads(output)
540
    assert len(payload['unavailability_calendars']) == 0
541

  
542
    group1 = Group.objects.create(name=u'gé1')
543
    group2 = Group.objects.create(name=u'gé2')
544
    calendar = UnavailabilityCalendar.objects.create(label='Calendar', view_role=group1, edit_role=group2)
545
    tp1_start = make_aware(datetime.datetime(2017, 5, 22, 8, 0))
546
    tp1_end = make_aware(datetime.datetime(2017, 5, 22, 12, 30))
547
    tp1 = TimePeriodException.objects.create(
548
        unavailability_calendar=calendar, start_datetime=tp1_start, end_datetime=tp1_end
549
    )
550
    tp2_start = make_aware(datetime.datetime(2018, 5, 22, 8, 0))
551
    tp2_end = make_aware(datetime.datetime(2018, 5, 22, 12, 30))
552
    tp2 = TimePeriodException.objects.create(
553
        unavailability_calendar=calendar, start_datetime=tp2_start, end_datetime=tp2_end
554
    )
555
    meetings_agenda = Agenda.objects.create(label='Foo Bar', kind='meetings')
556
    MeetingType.objects.create(agenda=meetings_agenda, label='Meeting Type', duration=30)
557
    desk = Desk.objects.create(agenda=meetings_agenda, label='Desk')
558
    desk.unavailability_calendars.add(calendar)
559

  
560
    output = get_output_of_command('export_site')
561
    payload = json.loads(output)
562
    assert len(payload['unavailability_calendars']) == 1
563
    assert len(payload['agendas']) == 1
564

  
565
    calendar.delete()
566
    tp1.delete()
567
    tp2.delete()
568
    meetings_agenda.delete()
569
    assert not UnavailabilityCalendar.objects.exists()
570
    assert not TimePeriodException.objects.exists()
571
    assert not Agenda.objects.exists()
572
    assert not Desk.objects.exists()
573

  
574
    import_site(payload)
575
    assert UnavailabilityCalendar.objects.count() == 1
576
    calendar = UnavailabilityCalendar.objects.first()
577
    assert calendar.label == 'Calendar'
578
    assert calendar.view_role == group1
579
    assert calendar.edit_role == group2
580
    assert calendar.timeperiodexception_set.count() == 2
581
    assert TimePeriodException.objects.get(
582
        unavailability_calendar=calendar, start_datetime=tp1_start, end_datetime=tp1_end
583
    )
584
    assert TimePeriodException.objects.get(
585
        unavailability_calendar=calendar, start_datetime=tp2_start, end_datetime=tp2_end
586
    )
587

  
588
    agenda = Agenda.objects.get(label='Foo Bar')
589
    desk = agenda.desk_set.first()
590
    assert desk.unavailability_calendars.count() == 1
591
    assert desk.unavailability_calendars.first() == calendar
534
-