Projet

Général

Profil

0002-agendas-allow-exception-sources-in-unavailability-ca.patch

Valentin Deniaud, 24 mars 2022 18:46

Télécharger (20,8 ko)

Voir les différences:

Subject: [PATCH 2/2] agendas: allow exception sources in unavailability
 calendars (#52370)

 .../migrations/0112_auto_20220323_1320.py     |  28 +++
 chrono/agendas/models.py                      |   4 +-
 chrono/manager/forms.py                       |  19 +-
 ...ager_unavailability_calendar_settings.html |   1 +
 chrono/manager/urls.py                        |   5 +
 chrono/manager/views.py                       |  80 ++++++--
 tests/manager/test_exception.py               | 176 ++++++++++++++++++
 7 files changed, 290 insertions(+), 23 deletions(-)
 create mode 100644 chrono/agendas/migrations/0112_auto_20220323_1320.py
chrono/agendas/migrations/0112_auto_20220323_1320.py
1
# Generated by Django 2.2.19 on 2022-03-23 12:20
2

  
3
import django.db.models.deletion
4
from django.db import migrations, models
5

  
6

  
7
class Migration(migrations.Migration):
8

  
9
    dependencies = [
10
        ('agendas', '0111_timeperiod_weekday_indexes'),
11
    ]
12

  
13
    operations = [
14
        migrations.AddField(
15
            model_name='timeperiodexceptionsource',
16
            name='unavailability_calendar',
17
            field=models.ForeignKey(
18
                null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.UnavailabilityCalendar'
19
            ),
20
        ),
21
        migrations.AlterField(
22
            model_name='timeperiodexceptionsource',
23
            name='desk',
24
            field=models.ForeignKey(
25
                null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.Desk'
26
            ),
27
        ),
28
    ]
chrono/agendas/models.py
2421 2421

  
2422 2422

  
2423 2423
class TimePeriodExceptionSource(models.Model):
2424
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
2424
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE, null=True)
2425
    unavailability_calendar = models.ForeignKey('UnavailabilityCalendar', on_delete=models.CASCADE, null=True)
2425 2426
    ics_filename = models.CharField(null=True, max_length=256)
2426 2427
    ics_file = models.FileField(upload_to=ics_directory_path, blank=True, null=True)
2427 2428
    ics_url = models.URLField(null=True, max_length=500)
......
2607 2608
                    'end_datetime': end_dt,
2608 2609
                    'label': summary,
2609 2610
                    'desk_id': self.desk_id,
2611
                    'unavailability_calendar_id': self.unavailability_calendar_id,
2610 2612
                    'source': self,
2611 2613
                    'recurrence_id': 0,
2612 2614
                }
chrono/manager/forms.py
1104 1104
        required=False,
1105 1105
        help_text=_('URL to remote calendar which will be synchronised hourly.'),
1106 1106
    )
1107

  
1108
    def clean(self, *args, **kwargs):
1109
        cleaned_data = super().clean(*args, **kwargs)
1110
        if not cleaned_data.get('ics_file') and not cleaned_data.get('ics_url'):
1111
            raise forms.ValidationError(_('Please provide an ICS File or an URL.'))
1112

  
1113

  
1114
class DeskExceptionsImportForm(ExceptionsImportForm):
1107 1115
    all_desks = forms.BooleanField(label=_('Apply exceptions on all desks of the agenda'), required=False)
1108 1116

  
1109 1117
    class Meta:
......
1117 1125
        elif self.instance.agenda.desk_simple_management:
1118 1126
            del self.fields['all_desks']
1119 1127

  
1120
    def clean(self, *args, **kwargs):
1121
        cleaned_data = super().clean(*args, **kwargs)
1122
        if not cleaned_data.get('ics_file') and not cleaned_data.get('ics_url'):
1123
            raise forms.ValidationError(_('Please provide an ICS File or an URL.'))
1128

  
1129
class UnavailabilityCalendarExceptionsImportForm(ExceptionsImportForm):
1130
    class Meta:
1131
        model = UnavailabilityCalendar
1132
        fields = []
1124 1133

  
1125 1134

  
1126 1135
class TimePeriodExceptionSourceReplaceForm(forms.ModelForm):
......
1145 1154

  
1146 1155
        old_filename = self.instance.ics_filename
1147 1156
        store_source(self.instance)
1148
        if self.instance.desk.agenda.desk_simple_management:
1157
        if self.instance.desk and self.instance.desk.agenda.desk_simple_management:
1149 1158
            for desk in self.instance.desk.agenda.desk_set.exclude(pk=self.instance.desk_id):
1150 1159
                source = desk.timeperiodexceptionsource_set.filter(ics_filename=old_filename).first()
1151 1160
                if source is not None:
chrono/manager/templates/chrono/manager_unavailability_calendar_settings.html
12 12
<span class="actions">
13 13
  <a class="extra-actions-menu-opener"></a>
14 14
  {% block agenda-extra-management-actions %}
15
    <a rel="popup" href="{% url 'chrono-manager-unavailability-calendar-import-unavailabilities' pk=unavailability_calendar.id %}">{% trans 'Manage unavailabilities from ICS' %}</a>
15 16
    <a rel="popup" href="{% url 'chrono-manager-unavailability-calendar-add-unavailability' pk=unavailability_calendar.id %}">{% trans 'Add Unavailability' %}</a>
16 17
  {% endblock %}
17 18
  <ul class="extra-actions-menu">
chrono/manager/urls.py
60 60
        views.unavailability_calendar_add_unavailability,
61 61
        name='chrono-manager-unavailability-calendar-add-unavailability',
62 62
    ),
63
    url(
64
        r'^unavailability-calendar/(?P<pk>\d+)/import-unavailabilities/$',
65
        views.unavailability_calendar_import_unavailabilities,
66
        name='chrono-manager-unavailability-calendar-import-unavailabilities',
67
    ),
63 68
    url(r'^resources/$', views.resource_list, name='chrono-manager-resource-list'),
64 69
    url(r'^resource/add/$', views.resource_add, name='chrono-manager-resource-add'),
65 70
    url(r'^resource/(?P<pk>\d+)/$', views.resource_view, name='chrono-manager-resource-view'),
chrono/manager/views.py
103 103
    BookingAbsenceReasonForm,
104 104
    BookingCancelForm,
105 105
    BookingCheckFilterSet,
106
    DeskExceptionsImportForm,
106 107
    DeskForm,
107 108
    EventCancelForm,
108 109
    EventForm,
109 110
    EventsTimesheetForm,
110
    ExceptionsImportForm,
111 111
    ImportEventsForm,
112 112
    MeetingTypeForm,
113 113
    NewDeskForm,
......
123 123
    TimePeriodForm,
124 124
    UnavailabilityCalendarAddForm,
125 125
    UnavailabilityCalendarEditForm,
126
    UnavailabilityCalendarExceptionsImportForm,
126 127
    VirtualMemberForm,
127 128
)
128 129
from .utils import export_site, import_site
......
2780 2781

  
2781 2782
class DeskImportTimePeriodExceptionsView(ManagedAgendaSubobjectMixin, UpdateView):
2782 2783
    model = Desk
2783
    form_class = ExceptionsImportForm
2784
    form_class = DeskExceptionsImportForm
2784 2785
    template_name = 'chrono/manager_import_exceptions.html'
2785 2786

  
2786 2787
    def get_context_data(self, **kwargs):
......
2848 2849
        source = self.get_object()
2849 2850
        response = super().delete(request, *args, **kwargs)
2850 2851

  
2851
        if not source.desk.agenda.desk_simple_management:
2852
        if not source.desk or not source.desk.agenda.desk_simple_management:
2852 2853
            return response
2853 2854

  
2854 2855
        for desk in source.desk.agenda.desk_set.exclude(pk=source.desk_id):
......
2875 2876
        queryset = super().get_queryset()
2876 2877
        return queryset.filter(ics_filename__isnull=False)
2877 2878

  
2878
    def import_file(self, desk, form):
2879
        source = desk.timeperiodexceptionsource_set.filter(
2880
            ics_filename=self.get_object().ics_filename
2881
        ).first()
2879
    def import_file(self, obj, form):
2880
        source = obj.timeperiodexceptionsource_set.filter(ics_filename=self.get_object().ics_filename).first()
2882 2881
        if source is not None:
2883 2882
            source.refresh_timeperiod_exceptions()
2884 2883

  
2885 2884
    def form_valid(self, form):
2886
        desk = self.get_object().desk
2887 2885
        try:
2888
            if desk.agenda.desk_simple_management:
2889
                for _desk in desk.agenda.desk_set.all():
2886
            if self.desk and self.desk.agenda.desk_simple_management:
2887
                for _desk in self.desk.agenda.desk_set.all():
2890 2888
                    self.import_file(_desk, form)
2891 2889
            else:
2892
                self.import_file(desk, form)
2890
                self.import_file(self.desk or self.unavailability_calendar, form)
2893 2891
        except ICSError as e:
2894 2892
            form.add_error(None, force_text(e))
2895 2893
            return self.form_invalid(form)
......
2908 2906
        queryset = super().get_queryset()
2909 2907
        return queryset.filter(ics_url__isnull=False)
2910 2908

  
2911
    def import_file(self, desk):
2912
        source = desk.timeperiodexceptionsource_set.filter(ics_url=self.get_object().ics_url).first()
2909
    def import_file(self, obj):
2910
        source = obj.timeperiodexceptionsource_set.filter(ics_url=self.get_object().ics_url).first()
2913 2911
        if source is not None:
2914 2912
            source.refresh_timeperiod_exceptions()
2915 2913

  
2916 2914
    def get(self, request, *args, **kwargs):
2917
        desk = self.get_object().desk
2918 2915
        try:
2919
            if desk.agenda.desk_simple_management:
2920
                for _desk in desk.agenda.desk_set.all():
2916
            if self.desk and self.desk.agenda.desk_simple_management:
2917
                for _desk in self.desk.agenda.desk_set.all():
2921 2918
                    self.import_file(_desk)
2922 2919
            else:
2923
                self.import_file(desk)
2920
                self.import_file(self.desk or self.unavailability_calendar)
2924 2921
        except ICSError as e:
2925 2922
            messages.error(self.request, force_text(e))
2926 2923

  
......
3352 3349
unavailability_calendar_add_unavailability = UnavailabilityCalendarAddUnavailabilityView.as_view()
3353 3350

  
3354 3351

  
3352
class UnavailabilityCalendarImportUnavailabilitiesView(ManagedUnavailabilityCalendarMixin, UpdateView):
3353
    model = UnavailabilityCalendar
3354
    form_class = UnavailabilityCalendarExceptionsImportForm
3355
    template_name = 'chrono/manager_import_exceptions.html'
3356

  
3357
    def get_context_data(self, **kwargs):
3358
        context = super().get_context_data(**kwargs)
3359
        unavailabilty_calendar = self.get_object()
3360
        context['exception_sources'] = unavailabilty_calendar.timeperiodexceptionsource_set.all()
3361
        context['base_template'] = 'chrono/manager_unavailability_calendar_settings.html'
3362
        return context
3363

  
3364
    def import_file(self, form):
3365
        unavailabilty_calendar = self.get_object()
3366
        if form.cleaned_data['ics_file']:
3367
            ics_file = form.cleaned_data['ics_file']
3368
            source = unavailabilty_calendar.timeperiodexceptionsource_set.create(
3369
                ics_filename=ics_file.name, ics_file=ics_file
3370
            )
3371
            ics_file.seek(0)
3372
        elif form.cleaned_data['ics_url']:
3373
            source = unavailabilty_calendar.timeperiodexceptionsource_set.create(
3374
                ics_url=form.cleaned_data['ics_url']
3375
            )
3376
        parsed = source._check_ics_content()
3377
        source._parsed = parsed
3378
        return source
3379

  
3380
    def form_valid(self, form):
3381
        try:
3382
            with transaction.atomic():
3383
                source = self.import_file(form)
3384
        except ICSError as e:
3385
            form.add_error(None, force_text(e))
3386
            return self.form_invalid(form)
3387

  
3388
        try:
3389
            source.refresh_timeperiod_exceptions(data=source._parsed)
3390
        except ICSError as e:
3391
            form.add_error(None, force_text(e))
3392
            return self.form_invalid(form)
3393

  
3394
        messages.info(self.request, _('Exceptions will be imported in a few minutes.'))
3395
        return super().form_valid(form)
3396

  
3397

  
3398
unavailability_calendar_import_unavailabilities = UnavailabilityCalendarImportUnavailabilitiesView.as_view()
3399

  
3400

  
3355 3401
class SharedCustodyAgendaMixin:
3356 3402
    agenda = None
3357 3403

  
tests/manager/test_exception.py
1380 1380

  
1381 1381
    resp = app.get('/manage/agendas/%s/settings' % agenda.id)
1382 1382
    assert 'warningnotice' not in resp.text
1383

  
1384

  
1385
def test_unavailability_calendar_import_time_period_exception_from_ics(app, admin_user):
1386
    calendar = UnavailabilityCalendar.objects.create(label='Example')
1387
    login(app)
1388
    resp = app.get('/manage/unavailability-calendar/%d/settings' % calendar.pk)
1389
    assert 'Manage unavailabilities from ICS' in resp.text
1390
    resp = resp.click('Manage unavailabilities')
1391
    assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp
1392
    resp = resp.form.submit(status=200)
1393
    assert 'Please provide an ICS File or an URL.' in resp.text
1394
    assert TimePeriodExceptionSource.objects.filter(unavailability_calendar=calendar).count() == 0
1395
    resp.form['ics_file'] = Upload('exceptions.ics', b'invalid content', 'text/calendar')
1396
    resp = resp.form.submit(status=200)
1397
    assert 'File format is invalid' in resp.text
1398
    assert TimePeriodExceptionSource.objects.filter(unavailability_calendar=calendar).count() == 0
1399

  
1400
    ics_with_exceptions = b"""BEGIN:VCALENDAR
1401
VERSION:2.0
1402
PRODID:-//foo.bar//EN
1403
BEGIN:VEVENT
1404
DTSTART:20180101
1405
DTEND:20180101
1406
SUMMARY:New Year's Eve
1407
END:VEVENT
1408
END:VCALENDAR"""
1409
    resp = app.get('/manage/unavailability-calendar/%s/import-unavailabilities/' % calendar.pk)
1410
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
1411
    resp = resp.form.submit(status=302)
1412
    assert TimePeriodException.objects.filter(unavailability_calendar=calendar).count() == 1
1413
    assert TimePeriodExceptionSource.objects.filter(unavailability_calendar=calendar).count() == 1
1414
    source = calendar.timeperiodexceptionsource_set.get()
1415
    exception = calendar.timeperiodexception_set.get()
1416
    assert exception.source == source
1417
    assert source.ics_filename == 'exceptions.ics'
1418
    assert 'exceptions.ics' in source.ics_file.name
1419
    assert source.ics_url is None
1420
    resp = resp.follow()
1421
    assert 'Exceptions will be imported in a few minutes.' in resp.text
1422

  
1423

  
1424
@mock.patch('chrono.agendas.models.requests.get')
1425
def test_unavailability_calendar_import_time_period_exception_with_remote_ics(mocked_get, app, admin_user):
1426
    calendar = UnavailabilityCalendar.objects.create(label='Example')
1427
    login(app)
1428
    resp = app.get('/manage/unavailability-calendar/%s/import-unavailabilities/' % calendar.pk)
1429

  
1430
    assert 'ics_file' in resp.form.fields
1431
    assert 'ics_url' in resp.form.fields
1432
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1433
    mocked_response = mock.Mock()
1434
    mocked_response.text = """BEGIN:VCALENDAR
1435
VERSION:2.0
1436
PRODID:-//foo.bar//EN
1437
BEGIN:VEVENT
1438
DTSTART:20180101
1439
DTEND:20180101
1440
SUMMARY:New Year's Eve
1441
END:VEVENT
1442
END:VCALENDAR"""
1443
    mocked_get.return_value = mocked_response
1444
    resp = resp.form.submit(status=302)
1445
    assert TimePeriodException.objects.filter(unavailability_calendar=calendar).count() == 1
1446
    assert TimePeriodExceptionSource.objects.filter(unavailability_calendar=calendar).count() == 1
1447
    source = calendar.timeperiodexceptionsource_set.get()
1448
    exception = calendar.timeperiodexception_set.get()
1449
    assert exception.source == source
1450
    assert source.ics_filename is None
1451
    assert source.ics_file.name == ''
1452
    assert source.ics_url == 'http://example.com/foo.ics'
1453

  
1454

  
1455
def test_unavailability_calendar_delete_time_period_exception_source(app, admin_user):
1456
    calendar = UnavailabilityCalendar.objects.create(label='Example')
1457
    source1 = TimePeriodExceptionSource.objects.create(
1458
        unavailability_calendar=calendar, ics_url='https://example.com/test.ics'
1459
    )
1460
    TimePeriodException.objects.create(
1461
        unavailability_calendar=calendar,
1462
        source=source1,
1463
        start_datetime=now() - datetime.timedelta(days=1),
1464
        end_datetime=now() + datetime.timedelta(days=1),
1465
    )
1466
    source2 = TimePeriodExceptionSource.objects.create(
1467
        unavailability_calendar=calendar, ics_url='https://example.com/test.ics'
1468
    )
1469
    TimePeriodException.objects.create(
1470
        unavailability_calendar=calendar,
1471
        source=source2,
1472
        start_datetime=now() - datetime.timedelta(days=1),
1473
        end_datetime=now() + datetime.timedelta(days=1),
1474
    )
1475

  
1476
    login(app)
1477
    resp = app.get('/manage/time-period-exceptions-source/%d/delete' % source2.pk)
1478
    resp = resp.form.submit()
1479
    assert TimePeriodException.objects.count() == 1
1480
    assert TimePeriodExceptionSource.objects.count() == 1
1481
    assert source1.timeperiodexception_set.count() == 1
1482
    assert TimePeriodExceptionSource.objects.filter(pk=source2.pk).exists() is False
1483

  
1484

  
1485
def test_unavailability_calendar_replace_time_period_exception_source(app, admin_user, freezer):
1486
    freezer.move_to('2019-12-01')
1487
    calendar = UnavailabilityCalendar.objects.create(label='Example')
1488
    ics_file_content = b"""BEGIN:VCALENDAR
1489
VERSION:2.0
1490
PRODID:-//foo.bar//EN
1491
BEGIN:VEVENT
1492
DTSTART:20180101
1493
DTEND:20180101
1494
SUMMARY:New Year's Eve
1495
RRULE:FREQ=YEARLY
1496
END:VEVENT
1497
END:VCALENDAR"""
1498

  
1499
    login(app)
1500
    # import a source from a file
1501
    resp = app.get('/manage/unavailability-calendar/%s/import-unavailabilities/' % calendar.pk)
1502
    resp.form['ics_file'] = Upload('exceptions.ics', ics_file_content, 'text/calendar')
1503
    resp = resp.form.submit(status=302).follow()
1504
    assert TimePeriodException.objects.filter(unavailability_calendar=calendar).count() == 2
1505
    source = TimePeriodExceptionSource.objects.latest('pk')
1506
    assert source.timeperiodexception_set.count() == 2
1507
    exceptions = list(source.timeperiodexception_set.order_by('pk'))
1508
    old_ics_file_path = source.ics_file.path
1509

  
1510
    # replace the source
1511
    resp = app.get('/manage/time-period-exceptions-source/%d/replace' % source.pk)
1512
    resp.form['ics_newfile'] = Upload('exceptions-bis.ics', ics_file_content, 'text/calendar')
1513
    resp = resp.form.submit().follow()
1514
    source.refresh_from_db()
1515
    assert source.ics_file.path != old_ics_file_path
1516
    assert source.ics_filename == 'exceptions-bis.ics'
1517
    assert os.path.exists(old_ics_file_path) is False
1518
    assert TimePeriodException.objects.count() == 2
1519
    assert source.timeperiodexception_set.count() == 2
1520
    new_exceptions = list(source.timeperiodexception_set.order_by('pk'))
1521
    assert exceptions[0].pk != new_exceptions[0].pk
1522
    assert exceptions[1].pk != new_exceptions[1].pk
1523

  
1524

  
1525
@mock.patch('chrono.agendas.models.requests.get')
1526
def test_unavailability_calendar_refresh_time_period_exception_source(mocked_get, app, admin_user):
1527
    mocked_response = mock.Mock()
1528
    mocked_response.text = """BEGIN:VCALENDAR
1529
VERSION:2.0
1530
PRODID:-//foo.bar//EN
1531
BEGIN:VEVENT
1532
DTSTART:20180101
1533
DTEND:20180101
1534
SUMMARY:New Year's Eve
1535
END:VEVENT
1536
END:VCALENDAR"""
1537
    mocked_get.return_value = mocked_response
1538

  
1539
    calendar = UnavailabilityCalendar.objects.create(label='Example')
1540

  
1541
    login(app)
1542
    # import a source from an url
1543
    resp = app.get('/manage/unavailability-calendar/%d/settings' % calendar.pk)
1544
    resp = resp.click('Manage unavailabilities')
1545
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1546
    resp = resp.form.submit(status=302).follow()
1547
    assert TimePeriodException.objects.filter(unavailability_calendar=calendar).count() == 1
1548
    source = TimePeriodExceptionSource.objects.latest('pk')
1549
    assert source.timeperiodexception_set.count() == 1
1550
    exceptions = list(source.timeperiodexception_set.order_by('pk'))
1551

  
1552
    # refresh the source
1553
    resp = app.get('/manage/unavailability-calendar/%d/settings' % calendar.pk)
1554
    resp = resp.click('Manage unavailabilities')
1555
    resp = resp.click(href='/manage/time-period-exceptions-source/%d/refresh' % source.pk)
1556
    assert TimePeriodException.objects.count() == 1
1557
    new_exceptions = list(source.timeperiodexception_set.order_by('pk'))
1558
    assert exceptions[0].pk != new_exceptions[0].pk
1383
-