Projet

Général

Profil

0001-agenda-add-support-for-remote-calendar-file-with-exc.patch

Serghei Mihai, 19 octobre 2017 19:30

Télécharger (29,5 ko)

Voir les différences:

Subject: [PATCH] agenda: add support for remote calendar file with exceptions
 (#19070)

Remote calendars can be specified by desk and updated hourly, or used once.
 .../commands/sync_desks_timeperiod_exceptions.py   |  32 ++++
 .../agendas/migrations/0020_auto_20171019_1729.py  |  32 ++++
 chrono/agendas/models.py                           |  46 +++++-
 chrono/manager/forms.py                            |  11 +-
 .../chrono/manager_import_exceptions.html          |   1 +
 chrono/manager/views.py                            |  22 ++-
 debian/chrono.cron.d                               |   3 +
 debian/control                                     |   3 +-
 requirements.txt                                   |   1 +
 setup.py                                           |   3 +-
 tests/test_agendas.py                              | 102 +++++++++++-
 tests/test_manager.py                              | 183 ++++++++++++++++++++-
 12 files changed, 412 insertions(+), 27 deletions(-)
 create mode 100644 chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py
 create mode 100644 chrono/agendas/migrations/0020_auto_20171019_1729.py
 create mode 100644 debian/chrono.cron.d
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py
1
# chrono - agendas system
2
# Copyright (C) 2016-2017  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import logging
18

  
19
from chrono.agendas.models import Desk, ICSError
20
from django.core.management.base import BaseCommand, CommandError
21

  
22

  
23
class Command(BaseCommand):
24
    help = 'Synchronize time period exceptions from desks remote ics'
25

  
26
    def handle(self, **options):
27
        logger = logging.getLogger(__name__)
28
        for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
29
            try:
30
                desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
31
            except ICSError as e:
32
                raise CommandError(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e))
chrono/agendas/migrations/0020_auto_20171019_1729.py
1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3

  
4
from django.db import migrations, models
5
import datetime
6
from django.utils.timezone import utc
7

  
8

  
9
class Migration(migrations.Migration):
10

  
11
    dependencies = [
12
        ('agendas', '0019_timeperiodexception'),
13
    ]
14

  
15
    operations = [
16
        migrations.AddField(
17
            model_name='desk',
18
            name='timeperiod_exceptions_remote_url',
19
            field=models.URLField(verbose_name='URL to fetch time period exceptions from', blank=True),
20
        ),
21
        migrations.AddField(
22
            model_name='timeperiodexception',
23
            name='creation_date',
24
            field=models.DateTimeField(default=datetime.datetime(2017, 10, 19, 17, 29, 53, 280020, tzinfo=utc), auto_now=True),
25
            preserve_default=False,
26
        ),
27
        migrations.AddField(
28
            model_name='timeperiodexception',
29
            name='external_id',
30
            field=models.CharField(max_length=256, null=True, verbose_name='External ID'),
31
        ),
32
    ]
chrono/agendas/models.py
16 16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 17

  
18 18
import datetime
19
import requests
19 20
import vobject
20 21

  
21 22
from django.contrib.auth.models import Group
......
28 29
from django.utils.encoding import force_text
29 30
from django.utils.formats import date_format, get_format
30 31
from django.utils.text import slugify
31
from django.utils.timezone import localtime, now, make_aware, make_naive
32
from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware
32 33
from django.utils.translation import ugettext_lazy as _
33 34

  
34 35
from jsonfield import JSONField
......
358 359
    agenda = models.ForeignKey(Agenda)
359 360
    label = models.CharField(_('Label'), max_length=150)
360 361
    slug = models.SlugField(_('Identifier'), max_length=150)
362
    timeperiod_exceptions_remote_url = models.URLField(_('URL to fetch time period exceptions from'),
363
                                    blank=True)
361 364

  
362 365
    def __unicode__(self):
363 366
        return self.label
......
417 420
        in_two_weeks = self.get_exceptions_within_two_weeks()
418 421
        return self.timeperiodexception_set.count() == len(in_two_weeks)
419 422

  
420
    def create_timeperiod_exceptions_from_ics(self, data):
423
    def create_timeperiod_exceptions_from_remote_ics(self, url):
424
        try:
425
            response = requests.get(url)
426
            response.raise_for_status()
427
        except requests.HTTPError as e:
428
            raise ICSError(_('Failed to retrieve remote calendar (HTTP error %s).') % e.response.status_code)
429
        except requests.RequestException as e:
430
            raise ICSError(_('Failed to retrieve remote calendar (%s).') % e)
431

  
432
        return self.create_timeperiod_exceptions_from_ics(response.text, store_uid=True)
433

  
434
    def remove_timeperiod_exceptions_from_remote_ics(self):
435
        TimePeriodException.objects.filter(external_id__startswith='desk-%s:' % self.id).delete()
436

  
437
    def create_timeperiod_exceptions_from_ics(self, data, store_uid=False):
421 438
        try:
422 439
            parsed = vobject.readOne(data)
423 440
        except vobject.base.ParseError:
......
429 446
            raise ICSError(_('The file doesn\'t contain any events.'))
430 447

  
431 448
        with transaction.atomic():
449
            update_time = now()
432 450
            for vevent in parsed.contents['vevent']:
433 451
                event = {}
452

  
453
                if store_uid:
454
                    event['external_id'] = 'desk-%s:%s' % (self.id, vevent.contents['uid'][0].value)
434 455
                summary = vevent.contents['summary'][0].value
435 456
                if not isinstance(summary, unicode):
436 457
                    summary = unicode(summary, 'utf-8')
......
441 462
                    if not isinstance(start_dt, datetime.datetime):
442 463
                        start_dt = datetime.datetime.combine(start_dt,
443 464
                                                datetime.datetime.min.time())
444
                    event['start_datetime'] = start_dt
465
                    if not is_aware(start_dt):
466
                        event['start_datetime'] = make_aware(start_dt)
467
                    else:
468
                        event['start_datetime'] = start_dt
445 469
                except AttributeError:
446 470
                    raise ICSError(_('Event "%s" has no start date.') % summary)
447 471
                try:
......
452 476
                except AttributeError:
453 477
                    # events without end date are considered as ending the same day
454 478
                    end_dt = datetime.datetime.combine(start_dt, datetime.datetime.max.time())
455
                event['end_datetime'] = end_dt
456

  
457
                obj, created = TimePeriodException.objects.get_or_create(desk=self, label=summary,
458
                                                                         **event)
479
                if not is_aware(end_dt):
480
                    event['end_datetime'] = make_aware(end_dt)
481
                else:
482
                    event['end_datetime'] = end_dt
483
                obj, created = TimePeriodException.objects.update_or_create(desk=self, label=summary, defaults=event)
484
                # return total_created
459 485
                if created:
460 486
                    total_created += 1
461 487

  
488
            # delete all outdated exceptions from remote calendar
489
            TimePeriodException.objects.filter(creation_date__lt=update_time,
490
                                external_id__startswith='desk-%s:' % self.id).delete()
491

  
462 492
        return total_created
463 493

  
464 494

  
465 495
class TimePeriodException(models.Model):
466 496
    desk = models.ForeignKey(Desk)
497
    external_id = models.CharField(_('External ID'), max_length=256, null=True)
467 498
    label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
468 499
    start_datetime = models.DateTimeField(_('Exception start time'))
469 500
    end_datetime = models.DateTimeField(_('Exception end time'))
501
    creation_date = models.DateTimeField(auto_now=True)
470 502

  
471 503
    class Meta:
472 504
        ordering = ['start_datetime']
chrono/manager/forms.py
16 16

  
17 17
import csv
18 18
import datetime
19
import requests
19 20

  
20 21
from django import forms
21 22
from django.forms import ValidationError
22 23
from django.utils.translation import ugettext_lazy as _
23 24

  
24 25
from chrono.agendas.models import (Event, MeetingType, TimePeriod, Desk,
25
                                   TimePeriodException)
26
                                   TimePeriodException, ICSError)
26 27

  
27 28
from . import widgets
28 29

  
......
83 84
        widgets = {
84 85
            'agenda': forms.HiddenInput(),
85 86
        }
86
        exclude = ['slug']
87
        exclude = ['slug', 'timeperiod_exceptions_remote_url']
87 88

  
88 89

  
89 90
class DeskForm(forms.ModelForm):
......
92 93
        widgets = {
93 94
            'agenda': forms.HiddenInput(),
94 95
        }
95
        exclude = []
96
        exclude = ['timeperiod_exceptions_remote_url']
96 97

  
97 98

  
98 99
class TimePeriodExceptionForm(forms.ModelForm):
......
170 171
        model = Desk
171 172
        fields = []
172 173

  
173
    ics_file = forms.FileField(label=_('ICS File'),
174
    ics_file = forms.FileField(label=_('ICS File'), required=False,
174 175
                               help_text=_('ICS file containing events which will be considered as exceptions'))
176
    ics_url = forms.URLField(label=_('URL'), required=False,
177
                               help_text=_('URL to remote calendar which will be synchronised hourly'))
chrono/manager/templates/chrono/manager_import_exceptions.html
13 13
{% block content %}
14 14

  
15 15
<form method="post" enctype="multipart/form-data">
16
  <p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p>
16 17
  {% csrf_token %}
17 18
  {{ form.as_p }}
18 19
  <p>
chrono/manager/views.py
394 394
    form_class = ExceptionsImportForm
395 395
    template_name = 'chrono/manager_import_exceptions.html'
396 396

  
397
    def get_initial(self):
398
        return {'ics_url': self.get_object().timeperiod_exceptions_remote_url}
399

  
397 400
    def form_valid(self, form):
401
        exceptions = None
398 402
        try:
399
            exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file'])
403
            if form.cleaned_data['ics_file']:
404
                exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file'])
405
            elif form.cleaned_data['ics_url']:
406
                exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
407
            else:
408
                form.instance.remove_timeperiod_exceptions_from_remote_ics()
400 409
        except ICSError as e:
401 410
            form.add_error(None, unicode(e))
402 411
            return self.form_invalid(form)
403
        message = ungettext('An exception has been imported.',
404
                            '%(count)d exceptions have been imported.', exceptions)
405
        message = message % {'count': exceptions}
406
        messages.info(self.request, message)
412
        form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url']
413
        form.instance.save()
414
        if exceptions is not None:
415
            message = ungettext('An exception has been imported.',
416
                                '%(count)d exceptions have been imported.', exceptions)
417
            message = message % {'count': exceptions}
418
            messages.info(self.request, message)
407 419
        return super(DeskImportTimePeriodExceptionsView, self).form_valid(form)
408 420

  
409 421
desk_import_time_period_exceptions = DeskImportTimePeriodExceptionsView.as_view()
debian/chrono.cron.d
1
MAILTO=root
2

  
3
0 * * * * /sbin/runuser -u chrono /usr/bin/chrono-manage -- tenant_command sync_desks_timeperiod_exceptions --all-tenants
debian/control
11 11
Depends: ${misc:Depends}, ${python:Depends},
12 12
    python-django (>= 1.8),
13 13
    python-gadjo,
14
    python-intervaltree
14
    python-intervaltree,
15
    python-requests
15 16
Recommends: python-django-mellon
16 17
Description: Agendas System (Python module)
17 18

  
requirements.txt
3 3
djangorestframework>=3.1, <3.7
4 4
django-jsonfield >= 0.9.3
5 5
intervaltree
6
requests
setup.py
107 107
        'djangorestframework>=3.1, <3.7',
108 108
        'django-jsonfield >= 0.9.3',
109 109
        'intervaltree',
110
        'vobject'
110
        'vobject',
111
        'requests'
111 112
        ],
112 113
    zip_safe=False,
113 114
    cmdclass={
tests/test_agendas.py
1 1
import pytest
2 2
import datetime
3
import logging
4
import mock
5
import requests
6

  
3 7

  
4 8
from django.utils.timezone import now, make_aware, localtime
9
from django.core.management import call_command
10
from django.core.management.base import CommandError
5 11

  
6 12
from chrono.agendas.models import (Agenda, Event, Booking, MeetingType,
7 13
                        Desk, TimePeriodException, ICSError)
......
22 28
BEGIN:VEVENT
23 29
DTSTAMP:20170824T092855Z
24 30
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
25
DTSTART:20170831T180800Z
26
DTEND:20170831T213400Z
31
DTSTART:20170830T180800Z
32
DTEND:20170831T223400Z
27 33
SEQUENCE:2
28 34
SUMMARY:Event 2
29 35
END:VEVENT
......
43 49
DTSTAMP:20170824T082855Z
44 50
DTSTART:20180101
45 51
DTEND:20180101
46
SUMMARY:New eve
52
SUMMARY:New Year's Eve
47 53
RRULE:FREQ=YEARLY
48 54
END:VEVENT
49 55
END:VCALENDAR"""
......
220 226
    with pytest.raises(ICSError) as e:
221 227
        exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
222 228
    assert str(e.value) == "The file doesn't contain any events."
229

  
230
@mock.patch('chrono.agendas.models.requests.get')
231
def test_timeperiodexception_creation_from_remote_ics(mocked_get):
232
    agenda = Agenda(label=u'Test 8 agenda')
233
    agenda.save()
234
    desk = Desk(label='Test 8 desk', agenda=agenda)
235
    desk.save()
236
    mocked_response = mock.Mock()
237
    mocked_response.text = ICS_SAMPLE
238
    mocked_get.return_value = mocked_response
239
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
240
    assert exceptions_count == 2
241

  
242
@mock.patch('chrono.agendas.models.requests.get')
243
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
244
    agenda = Agenda(label=u'Test 9 agenda')
245
    agenda.save()
246
    desk = Desk(label='Test 9 desk', agenda=agenda)
247
    desk.save()
248
    mocked_response = mock.Mock()
249
    mocked_response.text = ICS_SAMPLE
250
    mocked_get.return_value = mocked_response
251
    def mocked_requests_connection_error(*args, **kwargs):
252
        raise requests.ConnectionError('unreachable')
253
    mocked_get.side_effect = mocked_requests_connection_error
254
    with pytest.raises(ICSError) as e:
255
        exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
256
    assert str(e.value) == "Failed to retrieve remote calendar (unreachable)."
257

  
258
@mock.patch('chrono.agendas.models.requests.get')
259
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
260
    agenda = Agenda(label=u'Test 10 agenda')
261
    agenda.save()
262
    desk = Desk(label='Test 10 desk', agenda=agenda)
263
    desk.save()
264
    mocked_response = mock.Mock()
265
    mocked_response.status_code = 403
266
    mocked_get.return_value = mocked_response
267
    def mocked_requests_http_forbidden_error(*args, **kwargs):
268
        raise requests.HTTPError(response=mocked_response)
269
    mocked_get.side_effect = mocked_requests_http_forbidden_error
270

  
271
    with pytest.raises(ICSError) as e:
272
        exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
273
    assert str(e.value) == "Failed to retrieve remote calendar (HTTP error 403)."
274

  
275
@mock.patch('chrono.agendas.models.requests.get')
276
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, caplog):
277
    agenda = Agenda(label=u'Test 11 agenda')
278
    agenda.save()
279
    desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
280
    desk.save()
281
    mocked_response = mock.Mock()
282
    mocked_response.status_code = 403
283
    mocked_get.return_value = mocked_response
284
    def mocked_requests_http_forbidden_error(*args, **kwargs):
285
        raise requests.HTTPError(response=mocked_response)
286
    mocked_get.side_effect = mocked_requests_http_forbidden_error
287
    with pytest.raises(CommandError) as e:
288
        call_command('sync_desks_timeperiod_exceptions')
289
    assert str(e.value) == 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (HTTP error 403).'
290

  
291
@mock.patch('chrono.agendas.models.requests.get')
292
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
293
    agenda = Agenda(label=u'Test 11 agenda')
294
    agenda.save()
295
    desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
296
    desk.save()
297
    mocked_response = mock.Mock()
298
    mocked_response.text = ICS_SAMPLE
299
    mocked_get.return_value = mocked_response
300
    call_command('sync_desks_timeperiod_exceptions')
301
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
302
    mocked_response = mock.Mock()
303
    mocked_response.text = """BEGIN:VCALENDAR
304
VERSION:2.0
305
PRODID:-//foo.bar//EN
306
BEGIN:VEVENT
307
DTSTAMP:20180824T082855Z
308
UID:new-and-unique-uid
309
DTSTART:20180831T170800Z
310
DTEND:20180831T203400Z
311
SUMMARY:Wonderfull event
312
END:VEVENT
313
END:VCALENDAR"""
314
    mocked_get.return_value = mocked_response
315
    call_command('sync_desks_timeperiod_exceptions')
316
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
317
    exception = TimePeriodException.objects.get(desk=desk)
318
    assert exception.external_id == 'desk-%s:new-and-unique-uid' % desk.id
tests/test_manager.py
3 3
from django.contrib.auth.models import User, Group
4 4
from django.utils.timezone import make_aware, now, localtime
5 5
import datetime
6
import mock
6 7
import pytest
8
import requests
7 9
from webtest import TestApp, Upload
8 10

  
9 11
from chrono.wsgi import application
......
699 701
    assert 'Desk A' in resp.text
700 702
    assert 'Desk B' in resp.text
701 703

  
702

  
703 704
def test_meetings_agenda_delete_desk(app, admin_user):
704 705
    app = login(app)
705 706
    resp = app.get('/manage/', status=200)
......
840 841
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
841 842
    assert 'Import exceptions from .ics' in resp.content
842 843
    resp = resp.click('upload')
844
    assert "You can upload a file or specify an address to a remote calendar." in resp
845
    resp = resp.form.submit(status=302)
846
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
847
    resp = resp.click('upload')
843 848
    resp.form['ics_file'] = Upload('exceptions.ics', 'invalid content', 'text/calendar')
844 849
    resp = resp.form.submit(status=200)
845 850
    assert 'File format is invalid' in resp.content
......
849 854
BEGIN:VEVENT
850 855
DTSTART:20180101
851 856
DTEND:20180101
852
SUMMARY:New eve
857
SUMMARY:New Year's Eve
853 858
RRULE:FREQ=YEARLY
854 859
END:VEVENT
855 860
END:VCALENDAR"""
......
861 866
PRODID:-//foo.bar//EN
862 867
BEGIN:VEVENT
863 868
DTEND:20180101
864
SUMMARY:New eve
869
SUMMARY:New Year's Eve
865 870
END:VEVENT
866 871
END:VCALENDAR"""
867 872
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_start_date, 'text/calendar')
868 873
    resp = resp.form.submit(status=200)
869
    assert 'Event &quot;New eve&quot; has no start date.' in resp.content
874
    assert 'Event &quot;New Year&#39;s Eve&quot; has no start date.' in resp.content
870 875
    ics_with_no_events = """BEGIN:VCALENDAR
871 876
VERSION:2.0
872 877
PRODID:-//foo.bar//EN
......
881 886
BEGIN:VEVENT
882 887
DTSTART:20180101
883 888
DTEND:20180101
884
SUMMARY:New eve
889
SUMMARY:New Year's Eve
885 890
END:VEVENT
886 891
END:VCALENDAR"""
887 892
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
888 893
    resp = resp.form.submit(status=302)
889
    assert TimePeriodException.objects.count() == 1
894
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
890 895
    resp = resp.follow()
891 896
    assert 'An exception has been imported.' in resp.content
897

  
898
@mock.patch('chrono.agendas.models.requests.get')
899
def test_agenda_import_time_period_exception_with_remote_ics(mocked_get, app, admin_user):
900
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
901
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
902
    MeetingType(agenda=agenda, label='Bar').save()
903
    login(app)
904
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
905
    assert 'Import exceptions from .ics' not in resp.content
906

  
907
    TimePeriod.objects.create(weekday=1, desk=desk,
908
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
909

  
910
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
911
    resp = resp.click('upload')
912

  
913
    assert 'ics_file' in resp.form.fields
914
    assert 'ics_url' in resp.form.fields
915
    resp.form['ics_url'] = 'http://example.com/foo.ics'
916
    mocked_response = mock.Mock()
917
    mocked_response.text =  """BEGIN:VCALENDAR
918
VERSION:2.0
919
PRODID:-//foo.bar//EN
920
BEGIN:VEVENT
921
UID:random-event-id
922
DTSTART:20180101
923
DTEND:20180101
924
SUMMARY:New Year's Eve
925
END:VEVENT
926
END:VCALENDAR"""
927
    mocked_get.return_value = mocked_response
928
    resp = resp.form.submit(status=302)
929
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
930
    exception = TimePeriodException.objects.get(desk=desk)
931
    assert exception.external_id == 'desk-%s:random-event-id' % desk.id
932
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
933
    resp = resp.click('upload')
934
    resp.form['ics_url'] = ''
935
    resp = resp.form.submit(status=302)
936
    assert TimePeriodException.objects.filter(desk=desk,
937
                    external_id='desk-%s:random-event-id' % desk.id).count() == 0
938

  
939

  
940
@mock.patch('chrono.agendas.models.requests.get')
941
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user):
942
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
943
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
944
    MeetingType(agenda=agenda, label='Bar').save()
945
    login(app)
946
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
947
    assert 'Import exceptions from .ics' not in resp.content
948

  
949
    TimePeriod.objects.create(weekday=1, desk=desk,
950
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
951

  
952
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
953
    resp = resp.click('upload')
954
    resp.form['ics_url'] = 'http://example.com/foo.ics'
955
    mocked_response = mock.Mock()
956
    mocked_response.text =  """BEGIN:VCALENDAR
957
VERSION:2.0
958
PRODID:-//foo.bar//EN
959
BEGIN:VEVENT
960
UID:first-eventrandom-event-id
961
DTSTART:20180101
962
DTEND:20180101
963
SUMMARY:First test event
964
END:VEVENT
965
BEGIN:VEVENT
966
UID:second-eventrandom-event-id
967
DTSTART:20190101
968
DTEND:20190101
969
SUMMARY:Second test event
970
END:VEVENT
971
END:VCALENDAR"""
972
    mocked_get.return_value = mocked_response
973
    resp = resp.form.submit(status=302)
974
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
975
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
976
    resp = resp.click('upload')
977
    resp.form['ics_url'] = 'http://example.com/foo.ics'
978
    mocked_response.text = """BEGIN:VCALENDAR
979
VERSION:2.0
980
PRODID:-//foo.bar//EN
981
BEGIN:VEVENT
982
UID:secord-eventrandom-event-id
983
DTSTART:20190101
984
DTEND:20190101
985
SUMMARY:Second test event
986
END:VEVENT
987
END:VCALENDAR"""
988
    mocked_get.return_value = mocked_response
989
    resp = resp.form.submit(status=302)
990
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
991

  
992
@mock.patch('chrono.agendas.models.requests.get')
993
def test_agenda_import_time_period_exception_from_remote_ics_with_connection_error(mocked_get, app, admin_user):
994
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
995
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
996
    MeetingType(agenda=agenda, label='Bar').save()
997
    login(app)
998
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
999
    assert 'Import exceptions from .ics' not in resp.content
1000

  
1001
    TimePeriod.objects.create(weekday=1, desk=desk,
1002
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
1003

  
1004
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1005
    resp = resp.click('upload')
1006

  
1007
    assert 'ics_file' in resp.form.fields
1008
    assert 'ics_url' in resp.form.fields
1009
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1010
    mocked_response = mock.Mock()
1011
    mocked_get.return_value = mocked_response
1012
    def mocked_requests_connection_error(*args, **kwargs):
1013
        raise requests.exceptions.ConnectionError('unreachable')
1014
    mocked_get.side_effect = mocked_requests_connection_error
1015
    resp = resp.form.submit(status=200)
1016
    assert 'Failed to retrieve remote calendar (unreachable).' in resp.content
1017

  
1018
@mock.patch('chrono.agendas.models.requests.get')
1019
def test_agenda_import_time_period_exception_from_forbidden_remote_ics(mocked_get, app, admin_user):
1020
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
1021
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
1022
    MeetingType(agenda=agenda, label='Bar').save()
1023
    login(app)
1024
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1025
    assert 'Import exceptions from .ics' not in resp.content
1026

  
1027
    TimePeriod.objects.create(weekday=1, desk=desk,
1028
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
1029

  
1030
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1031
    resp = resp.click('upload')
1032
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1033
    mocked_response = mock.Mock()
1034
    mocked_response.status_code = 403
1035
    mocked_get.return_value = mocked_response
1036
    def mocked_requests_http_forbidden_error(*args, **kwargs):
1037
        raise requests.exceptions.HTTPError(response=mocked_response)
1038
    mocked_get.side_effect = mocked_requests_http_forbidden_error
1039
    resp = resp.form.submit(status=200)
1040
    assert 'Failed to retrieve remote calendar (HTTP error 403).' in resp.content
1041

  
1042
@mock.patch('chrono.agendas.models.requests.get')
1043
def test_agenda_import_time_period_exception_from_remote_ics_with_ssl_error(mocked_get, app, admin_user):
1044
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
1045
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
1046
    MeetingType(agenda=agenda, label='Bar').save()
1047
    login(app)
1048
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1049
    assert 'Import exceptions from .ics' not in resp.content
1050
    TimePeriod.objects.create(weekday=1, desk=desk,
1051
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
1052

  
1053
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1054
    resp = resp.click('upload')
1055
    resp.form['ics_url'] = 'https://example.com/foo.ics'
1056
    mocked_response = mock.Mock()
1057
    mocked_get.return_value = mocked_response
1058
    def mocked_requests_http_ssl_error(*args, **kwargs):
1059
        raise requests.exceptions.SSLError('SSL error')
1060
    mocked_get.side_effect = mocked_requests_http_ssl_error
1061
    resp = resp.form.submit(status=200)
1062
    assert 'Failed to retrieve remote calendar (SSL error).' in resp.content
892
-