Projet

Général

Profil

0001-agendas-handle-import-export-of-unavailability-calen.patch

Emmanuel Cazenave, 05 novembre 2020 12:57

Télécharger (21,9 ko)

Voir les différences:

Subject: [PATCH] agendas: handle import/export of unavailability calendars
 (#47394)

 chrono/agendas/models.py                      | 40 +++++++++
 chrono/manager/forms.py                       |  2 +-
 .../templates/chrono/agendas_import.html      |  2 +-
 ...ager_unavailability_calendar_settings.html |  1 +
 chrono/manager/urls.py                        |  5 ++
 chrono/manager/utils.py                       | 37 ++++++---
 chrono/manager/views.py                       | 73 ++++++++++++----
 tests/test_import_export.py                   | 65 +++++++++++++++
 tests/test_manager.py                         | 83 ++++++++++++++++++-
 9 files changed, 277 insertions(+), 31 deletions(-)
chrono/agendas/models.py
1146 1146
        timeperiods = data.pop('timeperiods', [])
1147 1147
        exceptions = data.pop('exceptions', [])
1148 1148
        sources = data.pop('exception_sources', [])
1149
        unavailability_calendars = data.pop('unavailability_calendars', [])
1149 1150
        data = clean_import_data(cls, data)
1150 1151
        desk, created = cls.objects.update_or_create(slug=data['slug'], agenda=data['agenda'], defaults=data)
1151 1152
        for timeperiod in timeperiods:
......
1157 1158
        for source in sources:
1158 1159
            source['desk'] = desk
1159 1160
            TimePeriodExceptionSource.import_json(source)
1161
        for unavailability_calendar in unavailability_calendars:
1162
            slug = unavailability_calendar['slug']
1163
            try:
1164
                target_calendar = UnavailabilityCalendar.objects.get(slug=slug)
1165
            except UnavailabilityCalendar.DoesNotExist:
1166
                raise AgendaImportError(_('The unavailability calendar "%s" does not exist.') % slug)
1167
            desk.unavailability_calendars.add(target_calendar)
1168

  
1160 1169
        return desk
1161 1170

  
1162 1171
    def export_json(self):
......
1168 1177
            'timeperiods': [time_period.export_json() for time_period in self.timeperiod_set.filter()],
1169 1178
            'exceptions': [exception.export_json() for exception in time_period_exceptions],
1170 1179
            'exception_sources': [source.export_json() for source in time_period_exception_sources],
1180
            'unavailability_calendars': [{'slug': x.slug} for x in self.unavailability_calendars.all()],
1171 1181
        }
1172 1182

  
1173 1183
    def duplicate(self, label=None, agenda_target=None):
......
1561 1571
    def get_absolute_url(self):
1562 1572
        return reverse('chrono-manager-unavailability-calendar-view', kwargs={'pk': self.id})
1563 1573

  
1574
    def export_json(self):
1575
        unavailability_calendar = {
1576
            'label': self.label,
1577
            'slug': self.slug,
1578
            'permissions': {
1579
                'view': self.view_role.name if self.view_role else None,
1580
                'edit': self.edit_role.name if self.edit_role else None,
1581
            },
1582
            'exceptions': [exception.export_json() for exception in self.timeperiodexception_set.all()],
1583
        }
1584
        return unavailability_calendar
1585

  
1586
    @classmethod
1587
    def import_json(cls, data, overwrite=False):
1588
        data = data.copy()
1589
        permissions = data.pop('permissions', {})
1590
        exceptions = data.pop('exceptions', [])
1591
        for permission in ('view', 'edit'):
1592
            if permissions.get(permission):
1593
                data[permission + '_role'] = Group.objects.get(name=permissions[permission])
1594
        data = clean_import_data(cls, data)
1595
        unavailability_calendar, created = cls.objects.update_or_create(slug=data['slug'], defaults=data)
1596
        if overwrite:
1597
            TimePeriodException.objects.filter(unavailability_calendar=unavailability_calendar).delete()
1598
        for exception in exceptions:
1599
            exception['unavailability_calendar'] = unavailability_calendar
1600
            TimePeriodException.import_json(exception)
1601

  
1602
        return created
1603

  
1564 1604

  
1565 1605
class TimePeriodException(models.Model):
1566 1606
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE, null=True)
chrono/manager/forms.py
525 525

  
526 526

  
527 527
class AgendasImportForm(forms.Form):
528
    agendas_json = forms.FileField(label=_('Agendas Export File'))
528
    agendas_json = forms.FileField(label=_('Export File'))
529 529

  
530 530

  
531 531
class AgendaDuplicateForm(forms.Form):
chrono/manager/templates/chrono/agendas_import.html
2 2
{% load i18n %}
3 3

  
4 4
{% block appbar %}
5
<h2>{% trans "Agendas Import" %}</h2>
5
<h2>{% trans "Import" %}</h2>
6 6
{% endblock %}
7 7

  
8 8
{% block content %}
chrono/manager/templates/chrono/manager_unavailability_calendar_settings.html
16 16
  {% endblock %}
17 17
  <ul class="extra-actions-menu">
18 18
    <li><a rel="popup" href="{% url 'chrono-manager-unavailability-calendar-edit' pk=unavailability_calendar.id %}">{% trans 'Options' %}</a></li>
19
    <li><a download href="{% url 'chrono-manager-unavailability-calendar-export' pk=unavailability_calendar.id %}">{% trans 'Export Configuration (JSON)' %}</a></li>
19 20
    {% if user.is_staff %}
20 21
      <li><a rel="popup" href="{% url 'chrono-manager-unavailability-calendar-delete' pk=unavailability_calendar.id %}">{% trans 'Delete' %}</a></li>
21 22
    {% endif %}
chrono/manager/urls.py
50 50
        views.unavailability_calendar_settings,
51 51
        name='chrono-manager-unavailability-calendar-settings',
52 52
    ),
53
    url(
54
        r'^unavailability-calendar/(?P<pk>\d+)/export$',
55
        views.unavailability_calendar_export,
56
        name='chrono-manager-unavailability-calendar-export',
57
    ),
53 58
    url(
54 59
        r'^unavailability-calendar/(?P<pk>\d+)/add-unavailability$',
55 60
        views.unavailability_calendar_add_unavailability,
chrono/manager/utils.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import collections
17 18
import itertools
18 19

  
19 20
from django.contrib.auth.models import Group
20 21
from django.db import transaction
21 22
from django.db.models import Q
22 23

  
23
from chrono.agendas.models import Agenda, AgendaImportError
24
from chrono.agendas.models import Agenda, AgendaImportError, UnavailabilityCalendar
24 25

  
25 26

  
26 27
def export_site():
27 28
    '''Dump site objects to JSON-dumpable dictionnary'''
28
    data = {}
29
    data = collections.OrderedDict()
30
    data['unavailability_calendars'] = [x.export_json() for x in UnavailabilityCalendar.objects.all()]
29 31
    qs1 = Agenda.objects.filter(~Q(kind='virtual'))
30 32
    qs2 = Agenda.objects.filter(kind='virtual')
31 33
    data['agendas'] = [x.export_json() for x in itertools.chain(qs1, qs2)]
......
33 35

  
34 36

  
35 37
def import_site(data, if_empty=False, clean=False, overwrite=False):
36
    if if_empty and Agenda.objects.count():
38
    if if_empty and (Agenda.objects.count() or UnavailabilityCalendar.objects.count()):
37 39
        return
38 40

  
39 41
    if clean:
40 42
        Agenda.objects.all().delete()
43
        UnavailabilityCalendar.objects.all().count()
41 44

  
42
    results = {'created': 0, 'updated': 0}
45
    results = {
46
        'agendas': {'created': 0, 'updated': 0},
47
        'unavailability_calendars': {'created': 0, 'updated': 0},
48
    }
43 49
    agendas = data.get('agendas', [])
50
    unavailability_calendars = data.get('unavailability_calendars', [])
44 51

  
45
    role_names = {name for data in agendas for _, name in data.get('permissions', {}).items() if name}
52
    role_names = set()
53
    for objs in (agendas, unavailability_calendars):
54
        role_names = role_names.union(
55
            {name for data in objs for _, name in data.get('permissions', {}).items() if name}
56
        )
46 57
    existing_roles = Group.objects.filter(name__in=role_names)
47 58

  
48 59
    if existing_roles.count() != len(role_names):
......
50 61
        raise AgendaImportError('Missing roles: "%s"' % ', '.join(role_names - existing_roles_names))
51 62

  
52 63
    with transaction.atomic():
53
        for data in agendas:
54
            created = Agenda.import_json(data, overwrite=overwrite)
55
            if created:
56
                results['created'] += 1
57
            else:
58
                results['updated'] += 1
64
        for objs, cls, label in (
65
            (unavailability_calendars, UnavailabilityCalendar, 'unavailability_calendars'),
66
            (agendas, Agenda, 'agendas'),
67
        ):
68
            for data in objs:
69
                created = cls.import_json(data, overwrite=overwrite)
70
                if created:
71
                    results[label]['created'] += 1
72
                else:
73
                    results[label]['updated'] += 1
59 74
    return results
chrono/manager/views.py
618 618
            form.add_error('agendas_json', _('Key "%s" is missing.') % exc.args[0])
619 619
            return self.form_invalid(form)
620 620

  
621
        if results.get('created') == 0 and results.get('updated') == 0:
622
            messages.info(self.request, _('No agendas were found.'))
623
        else:
624
            if results.get('created') == 0:
625
                message1 = _('No agenda created.')
626
            else:
627
                message1 = ungettext(
628
                    'An agenda has been created.', '%(count)d agendas have been created.', results['created']
629
                ) % {'count': results['created']}
630

  
631
            if results.get('updated') == 0:
632
                message2 = _('No agenda updated.')
633
            else:
634
                message2 = ungettext(
635
                    'An agenda has been updated.', '%(count)d agendas have been updated.', results['updated']
636
                ) % {'count': results['updated']}
637
            messages.info(self.request, u'%s %s' % (message1, message2))
621
        global_noop = True
622
        for obj_name, obj_results in results.items():
623
            if obj_name == 'agendas':
624
                message_create_noop = _('No agenda created.')
625
                message_create = ('An agenda has been created.', '%(count)d agendas have been created.')
626
                message_update_noop = _('No agenda updated.')
627
                message_update = ('An agenda has been updated.', '%(count)d agendas have been updated.')
628
            elif obj_name == 'unavailability_calendars':
629
                message_create_noop = _('No unavailability calendar created.')
630
                message_create = (
631
                    'An unavailability calendar has been created.',
632
                    '%(count)d unavailability calendars have been created.',
633
                )
634
                message_update_noop = _('No unavailability calendar updated.')
635
                message_update = (
636
                    'An unavailability calendar has been updated.',
637
                    '%(count)d unavailability calendars have been updated.',
638
                )
639

  
640
            if obj_results.get('created') != 0 or obj_results.get('updated') != 0:
641
                global_noop = False
642
                if obj_results.get('created') == 0:
643
                    message1 = message_create_noop
644
                else:
645
                    message1 = ungettext(message_create[0], message_create[1], obj_results['created']) % {
646
                        'count': obj_results['created']
647
                    }
648

  
649
                if obj_results.get('updated') == 0:
650
                    message2 = message_update_noop
651
                else:
652
                    message2 = ungettext(message_update[0], message_update[1], obj_results['updated']) % {
653
                        'count': obj_results['updated']
654
                    }
655
                messages.info(self.request, u'%s %s' % (message1, message2))
656

  
657
        if global_noop:
658
            messages.info(self.request, _('No data found.'))
638 659

  
639 660
        return super(AgendasImportView, self).form_valid(form)
640 661

  
......
2364 2385
unavailability_calendar_settings = UnavailabilityCalendarSettings.as_view()
2365 2386

  
2366 2387

  
2388
class UnavailabilityCalendarExport(ManagedUnavailabilityCalendarMixin, DetailView):
2389
    model = UnavailabilityCalendar
2390

  
2391
    def get(self, request, *args, **kwargs):
2392
        response = HttpResponse(content_type='application/json')
2393
        today = datetime.date.today()
2394
        response[
2395
            'Content-Disposition'
2396
        ] = 'attachment; filename="export_unavailability-calendar_{}_{}.json"'.format(
2397
            self.get_object().slug, today.strftime('%Y%m%d')
2398
        )
2399
        json.dump({'unavailability_calendars': [self.get_object().export_json()]}, response, indent=2)
2400
        return response
2401

  
2402

  
2403
unavailability_calendar_export = UnavailabilityCalendarExport.as_view()
2404

  
2405

  
2367 2406
class UnavailabilityCalendarAddUnavailabilityView(ManagedUnavailabilityCalendarMixin, CreateView):
2368 2407
    template_name = 'chrono/manager_time_period_exception_form.html'
2369 2408
    form_class = TimePeriodExceptionForm
tests/test_import_export.py
32 32
    VirtualMember,
33 33
    AgendaNotificationsSettings,
34 34
    AgendaReminderSettings,
35
    UnavailabilityCalendar,
35 36
)
36 37
from chrono.manager.utils import import_site
37 38

  
......
594 595

  
595 596
    assert TimePeriod.objects.count() == 2
596 597
    assert TimePeriodException.objects.count() == 2
598

  
599

  
600
def test_import_export_unavailability_calendar(app):
601
    output = get_output_of_command('export_site')
602
    payload = json.loads(output)
603
    assert len(payload['unavailability_calendars']) == 0
604

  
605
    group1 = Group.objects.create(name=u'gé1')
606
    group2 = Group.objects.create(name=u'gé2')
607
    calendar = UnavailabilityCalendar.objects.create(label='Calendar', view_role=group1, edit_role=group2)
608
    tp1_start = make_aware(datetime.datetime(2017, 5, 22, 8, 0))
609
    tp1_end = make_aware(datetime.datetime(2017, 5, 22, 12, 30))
610
    tp1 = TimePeriodException.objects.create(
611
        unavailability_calendar=calendar, start_datetime=tp1_start, end_datetime=tp1_end
612
    )
613
    tp2_start = make_aware(datetime.datetime(2018, 5, 22, 8, 0))
614
    tp2_end = make_aware(datetime.datetime(2018, 5, 22, 12, 30))
615
    tp2 = TimePeriodException.objects.create(
616
        unavailability_calendar=calendar, start_datetime=tp2_start, end_datetime=tp2_end
617
    )
618
    meetings_agenda = Agenda.objects.create(label='Foo Bar', kind='meetings')
619
    MeetingType.objects.create(agenda=meetings_agenda, label='Meeting Type', duration=30)
620
    desk = Desk.objects.create(agenda=meetings_agenda, label='Desk')
621
    desk.unavailability_calendars.add(calendar)
622

  
623
    output = get_output_of_command('export_site')
624
    payload = json.loads(output)
625
    assert len(payload['unavailability_calendars']) == 1
626
    assert len(payload['agendas']) == 1
627

  
628
    calendar.delete()
629
    tp1.delete()
630
    tp2.delete()
631
    meetings_agenda.delete()
632
    assert not UnavailabilityCalendar.objects.exists()
633
    assert not TimePeriodException.objects.exists()
634
    assert not Agenda.objects.exists()
635
    assert not Desk.objects.exists()
636

  
637
    import_site(copy.deepcopy(payload))
638
    assert UnavailabilityCalendar.objects.count() == 1
639
    calendar = UnavailabilityCalendar.objects.first()
640
    assert calendar.label == 'Calendar'
641
    assert calendar.view_role == group1
642
    assert calendar.edit_role == group2
643
    assert calendar.timeperiodexception_set.count() == 2
644
    assert TimePeriodException.objects.get(
645
        unavailability_calendar=calendar, start_datetime=tp1_start, end_datetime=tp1_end
646
    )
647
    assert TimePeriodException.objects.get(
648
        unavailability_calendar=calendar, start_datetime=tp2_start, end_datetime=tp2_end
649
    )
650

  
651
    agenda = Agenda.objects.get(label='Foo Bar')
652
    desk = agenda.desk_set.first()
653
    assert desk.unavailability_calendars.count() == 1
654
    assert desk.unavailability_calendars.first() == calendar
655

  
656
    # update
657
    update_payload = copy.deepcopy(payload)
658
    update_payload['unavailability_calendars'][0]['label'] = 'Calendar Updated'
659
    import_site(update_payload)
660
    calendar.refresh_from_db()
661
    assert calendar.label == 'Calendar Updated'
tests/test_manager.py
3534 3534
    resp = resp.click('Import')
3535 3535
    resp.form['agendas_json'] = Upload('export.json', b'{}', 'application/json')
3536 3536
    resp = resp.form.submit().follow()
3537
    assert 'No agendas were found.' in resp.text
3537
    assert 'No data found.' in resp.text
3538 3538

  
3539 3539
    # existing agenda
3540 3540
    resp = app.get('/manage/', status=200)
......
3599 3599
    assert resp.context['form'].errors['agendas_json'] == ['Key "kind" is missing.']
3600 3600

  
3601 3601

  
3602
def test_import_unavailability_calendar(app, admin_user):
3603
    calendar = UnavailabilityCalendar.objects.create(label=u'Foo bar')
3604

  
3605
    app = login(app)
3606
    with freezegun.freeze_time('2020-06-15'):
3607
        resp = app.get('/manage/unavailability-calendar/%s/export' % calendar.id)
3608
    assert resp.headers['content-type'] == 'application/json'
3609
    assert (
3610
        resp.headers['content-disposition']
3611
        == 'attachment; filename="export_unavailability-calendar_foo-bar_20200615.json"'
3612
    )
3613
    calendar_export = resp.text
3614

  
3615
    # empty json
3616
    resp = app.get('/manage/', status=200)
3617
    resp = resp.click('Import')
3618
    resp.form['agendas_json'] = Upload('export.json', b'{}', 'application/json')
3619
    resp = resp.form.submit().follow()
3620
    assert 'No data found.' in resp.text
3621

  
3622
    # existing unavailability calendar
3623
    resp = app.get('/manage/', status=200)
3624
    resp = resp.click('Import')
3625
    resp.form['agendas_json'] = Upload('export.json', calendar_export.encode('utf-8'), 'application/json')
3626
    resp = resp.form.submit().follow()
3627
    assert 'No unavailability calendar created. An unavailability calendar has been updated.' in resp.text
3628
    assert UnavailabilityCalendar.objects.count() == 1
3629

  
3630
    # new unavailability calendar
3631
    UnavailabilityCalendar.objects.all().delete()
3632
    resp = app.get('/manage/', status=200)
3633
    resp = resp.click('Import')
3634
    resp.form['agendas_json'] = Upload('export.json', calendar_export.encode('utf-8'), 'application/json')
3635
    resp = resp.form.submit().follow()
3636
    assert 'An unavailability calendar has been created. No unavailability calendar updated.' in resp.text
3637
    assert UnavailabilityCalendar.objects.count() == 1
3638

  
3639
    # multiple unavailability calendars
3640
    calendars = json.loads(calendar_export)
3641
    calendars['unavailability_calendars'].append(copy.copy(calendars['unavailability_calendars'][0]))
3642
    calendars['unavailability_calendars'].append(copy.copy(calendars['unavailability_calendars'][0]))
3643
    calendars['unavailability_calendars'][1]['label'] = 'Foo bar 2'
3644
    calendars['unavailability_calendars'][1]['slug'] = 'foo-bar-2'
3645
    calendars['unavailability_calendars'][2]['label'] = 'Foo bar 3'
3646
    calendars['unavailability_calendars'][2]['slug'] = 'foo-bar-3'
3647

  
3648
    resp = app.get('/manage/', status=200)
3649
    resp = resp.click('Import')
3650
    resp.form['agendas_json'] = Upload(
3651
        'export.json', json.dumps(calendars).encode('utf-8'), 'application/json'
3652
    )
3653
    resp = resp.form.submit().follow()
3654
    assert (
3655
        '2 unavailability calendars have been created. An unavailability calendar has been updated.'
3656
        in resp.text
3657
    )
3658
    assert UnavailabilityCalendar.objects.count() == 3
3659

  
3660
    UnavailabilityCalendar.objects.all().delete()
3661
    resp = app.get('/manage/', status=200)
3662
    resp = resp.click('Import')
3663
    resp.form['agendas_json'] = Upload(
3664
        'export.json', json.dumps(calendars).encode('utf-8'), 'application/json'
3665
    )
3666
    resp = resp.form.submit().follow()
3667
    assert '3 unavailability calendars have been created. No unavailability calendar updated.' in resp.text
3668
    assert UnavailabilityCalendar.objects.count() == 3
3669

  
3670
    # reference to unknown group
3671
    calendar_export_dict = json.loads(force_text(calendar_export))
3672
    calendar_export_dict['unavailability_calendars'][0]['permissions']['view'] = u'gé1'
3673
    calendar_export = json.dumps(calendar_export_dict).encode('utf-8')
3674
    UnavailabilityCalendar.objects.all().delete()
3675
    resp = app.get('/manage/', status=200)
3676
    resp = resp.click('Import')
3677
    resp.form['agendas_json'] = Upload('export.json', calendar_export, 'application/json')
3678
    resp = resp.form.submit()
3679
    assert u'Missing roles: &quot;gé1&quot;' in resp.text
3680
    del calendar_export_dict['unavailability_calendars'][0]['permissions']['view']
3681

  
3682

  
3602 3683
def test_import_does_not_delete_bookings(app, admin_user):
3603 3684
    agenda = Agenda.objects.create(label='Foo', kind='meetings')
3604 3685
    meeting_type = MeetingType.objects.create(agenda=agenda, label='Meeting Type', duration=30)
3605
-