From 1b61098b9cbea2105d5ed894ab050b9a999a8038 Mon Sep 17 00:00:00 2001 From: Serghei Mihai Date: Tue, 3 Oct 2017 23:15:00 +0200 Subject: [PATCH] agenda: add support for remote calendar file with exceptions (#19070) Remote calendars can be specified by desk and updated hourly, or used once. --- chrono/agendas/management/__init__.py | 0 chrono/agendas/management/commands/__init__.py | 0 .../commands/sync_desks_timeperiod_exceptions.py | 29 +++ .../agendas/migrations/0020_auto_20171031_0917.py | 32 +++ chrono/agendas/models.py | 55 ++++- chrono/manager/forms.py | 8 +- .../chrono/manager_import_exceptions.html | 1 + chrono/manager/views.py | 22 +- debian/chrono.cron.d | 3 + debian/control | 3 +- requirements.txt | 2 + setup.py | 3 +- tests/test_agendas.py | 116 ++++++++++- tests/test_manager.py | 225 ++++++++++++++++++++- 14 files changed, 472 insertions(+), 27 deletions(-) create mode 100644 chrono/agendas/management/__init__.py create mode 100644 chrono/agendas/management/commands/__init__.py create mode 100644 chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py create mode 100644 chrono/agendas/migrations/0020_auto_20171031_0917.py create mode 100644 debian/chrono.cron.d diff --git a/chrono/agendas/management/__init__.py b/chrono/agendas/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chrono/agendas/management/commands/__init__.py b/chrono/agendas/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py b/chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py new file mode 100644 index 0000000..2fa3dd2 --- /dev/null +++ b/chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py @@ -0,0 +1,29 @@ +# chrono - agendas system +# Copyright (C) 2016-2017 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from chrono.agendas.models import Desk, ICSError +from django.core.management.base import BaseCommand, CommandError + + +class Command(BaseCommand): + help = 'Synchronize time period exceptions from desks remote ics' + + def handle(self, **options): + for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''): + try: + desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url) + except ICSError as e: + raise CommandError(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e)) diff --git a/chrono/agendas/migrations/0020_auto_20171031_0917.py b/chrono/agendas/migrations/0020_auto_20171031_0917.py new file mode 100644 index 0000000..a2c9fd7 --- /dev/null +++ b/chrono/agendas/migrations/0020_auto_20171031_0917.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models +import datetime +from django.utils.timezone import utc + + +class Migration(migrations.Migration): + + dependencies = [ + ('agendas', '0019_timeperiodexception'), + ] + + operations = [ + migrations.AddField( + model_name='desk', + name='timeperiod_exceptions_remote_url', + field=models.URLField(verbose_name='URL to fetch time period exceptions from', blank=True), + ), + migrations.AddField( + model_name='timeperiodexception', + name='external_id', + field=models.CharField(max_length=256, null=True, verbose_name='External ID'), + ), + migrations.AddField( + model_name='timeperiodexception', + name='update_datetime', + field=models.DateTimeField(default=datetime.datetime(2017, 10, 31, 9, 17, 41, 101067, tzinfo=utc), auto_now=True), + preserve_default=False, + ), + ] diff --git a/chrono/agendas/models.py b/chrono/agendas/models.py index 3a1c1a0..ee85ee8 100644 --- a/chrono/agendas/models.py +++ b/chrono/agendas/models.py @@ -16,6 +16,7 @@ # along with this program. If not, see . import datetime +import requests import vobject from django.contrib.auth.models import Group @@ -28,7 +29,7 @@ from django.utils.dates import WEEKDAYS from django.utils.encoding import force_text from django.utils.formats import date_format, get_format from django.utils.text import slugify -from django.utils.timezone import localtime, now, make_aware, make_naive +from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware from django.utils.translation import ugettext_lazy as _ from jsonfield import JSONField @@ -358,6 +359,8 @@ class Desk(models.Model): agenda = models.ForeignKey(Agenda) label = models.CharField(_('Label'), max_length=150) slug = models.SlugField(_('Identifier'), max_length=150) + timeperiod_exceptions_remote_url = models.URLField(_('URL to fetch time period exceptions from'), + blank=True) def __unicode__(self): return self.label @@ -417,7 +420,21 @@ class Desk(models.Model): in_two_weeks = self.get_exceptions_within_two_weeks() return self.timeperiodexception_set.count() == len(in_two_weeks) - def create_timeperiod_exceptions_from_ics(self, data): + def create_timeperiod_exceptions_from_remote_ics(self, url): + try: + response = requests.get(url) + response.raise_for_status() + except requests.HTTPError as e: + raise ICSError(_('Failed to retrieve remote calendar (HTTP error %s).') % e.response.status_code) + except requests.RequestException as e: + raise ICSError(_('Failed to retrieve remote calendar (%s).') % e) + + return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True) + + def remove_timeperiod_exceptions_from_remote_ics(self): + TimePeriodException.objects.exclude(desk=self, external_id='').delete() + + def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False): try: parsed = vobject.readOne(data) except vobject.base.ParseError: @@ -425,12 +442,14 @@ class Desk(models.Model): total_created = 0 - if not parsed.contents.get('vevent'): + if not parsed.contents.get('vevent') and not keep_synced_by_uid: raise ICSError(_('The file doesn\'t contain any events.')) with transaction.atomic(): - for vevent in parsed.contents['vevent']: + update_datetime = now() + for vevent in parsed.contents.get('vevent', []): event = {} + summary = vevent.contents['summary'][0].value if not isinstance(summary, unicode): summary = unicode(summary, 'utf-8') @@ -441,7 +460,10 @@ class Desk(models.Model): if not isinstance(start_dt, datetime.datetime): start_dt = datetime.datetime.combine(start_dt, datetime.datetime.min.time()) - event['start_datetime'] = start_dt + if not is_aware(start_dt): + event['start_datetime'] = make_aware(start_dt) + else: + event['start_datetime'] = start_dt except AttributeError: raise ICSError(_('Event "%s" has no start date.') % summary) try: @@ -452,21 +474,36 @@ class Desk(models.Model): except AttributeError: # events without end date are considered as ending the same day end_dt = datetime.datetime.combine(start_dt, datetime.datetime.max.time()) - event['end_datetime'] = end_dt - - obj, created = TimePeriodException.objects.get_or_create(desk=self, label=summary, - **event) + if not is_aware(end_dt): + event['end_datetime'] = make_aware(end_dt) + else: + event['end_datetime'] = end_dt + if keep_synced_by_uid: + external_id = vevent.contents['uid'][0].value + event['label'] = summary + obj, created = TimePeriodException.objects.update_or_create(desk=self, external_id=external_id, + defaults=event) + else: + obj, created = TimePeriodException.objects.update_or_create(desk=self, label=summary, defaults=event) + # return total_created if created: total_created += 1 + if keep_synced_by_uid: + # delete all outdated exceptions from remote calendar + TimePeriodException.objects.filter(update_datetime__lt=update_datetime, + desk=self).exclude(external_id='').delete() + return total_created class TimePeriodException(models.Model): desk = models.ForeignKey(Desk) + external_id = models.CharField(_('External ID'), max_length=256, null=True) label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True) start_datetime = models.DateTimeField(_('Exception start time')) end_datetime = models.DateTimeField(_('Exception end time')) + update_datetime = models.DateTimeField(auto_now=True) class Meta: ordering = ['start_datetime'] diff --git a/chrono/manager/forms.py b/chrono/manager/forms.py index cc6e37c..9388d72 100644 --- a/chrono/manager/forms.py +++ b/chrono/manager/forms.py @@ -83,7 +83,7 @@ class NewDeskForm(forms.ModelForm): widgets = { 'agenda': forms.HiddenInput(), } - exclude = ['slug'] + exclude = ['slug', 'timeperiod_exceptions_remote_url'] class DeskForm(forms.ModelForm): @@ -92,7 +92,7 @@ class DeskForm(forms.ModelForm): widgets = { 'agenda': forms.HiddenInput(), } - exclude = [] + exclude = ['timeperiod_exceptions_remote_url'] class TimePeriodExceptionForm(forms.ModelForm): @@ -170,5 +170,7 @@ class ExceptionsImportForm(forms.ModelForm): model = Desk fields = [] - ics_file = forms.FileField(label=_('ICS File'), + ics_file = forms.FileField(label=_('ICS File'), required=False, help_text=_('ICS file containing events which will be considered as exceptions')) + ics_url = forms.URLField(label=_('URL'), required=False, + help_text=_('URL to remote calendar which will be synchronised hourly')) diff --git a/chrono/manager/templates/chrono/manager_import_exceptions.html b/chrono/manager/templates/chrono/manager_import_exceptions.html index d3eacc5..1b119d8 100644 --- a/chrono/manager/templates/chrono/manager_import_exceptions.html +++ b/chrono/manager/templates/chrono/manager_import_exceptions.html @@ -13,6 +13,7 @@ {% block content %}
+

{% trans "You can upload a file or specify an address to a remote calendar." %}

{% csrf_token %} {{ form.as_p }}

diff --git a/chrono/manager/views.py b/chrono/manager/views.py index c5d4eae..91722c7 100644 --- a/chrono/manager/views.py +++ b/chrono/manager/views.py @@ -394,16 +394,28 @@ class DeskImportTimePeriodExceptionsView(ManagedAgendaSubobjectMixin, UpdateView form_class = ExceptionsImportForm template_name = 'chrono/manager_import_exceptions.html' + def get_initial(self): + return {'ics_url': self.get_object().timeperiod_exceptions_remote_url} + def form_valid(self, form): + exceptions = None try: - exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file']) + if form.cleaned_data['ics_file']: + exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file']) + elif form.cleaned_data['ics_url']: + exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url']) + else: + form.instance.remove_timeperiod_exceptions_from_remote_ics() except ICSError as e: form.add_error(None, unicode(e)) return self.form_invalid(form) - message = ungettext('An exception has been imported.', - '%(count)d exceptions have been imported.', exceptions) - message = message % {'count': exceptions} - messages.info(self.request, message) + form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url'] + form.instance.save() + if exceptions is not None: + message = ungettext('An exception has been imported.', + '%(count)d exceptions have been imported.', exceptions) + message = message % {'count': exceptions} + messages.info(self.request, message) return super(DeskImportTimePeriodExceptionsView, self).form_valid(form) desk_import_time_period_exceptions = DeskImportTimePeriodExceptionsView.as_view() diff --git a/debian/chrono.cron.d b/debian/chrono.cron.d new file mode 100644 index 0000000..83691cf --- /dev/null +++ b/debian/chrono.cron.d @@ -0,0 +1,3 @@ +MAILTO=root + +0 * * * * /sbin/runuser -u chrono /usr/bin/chrono-manage -- tenant_command sync_desks_timeperiod_exceptions --all-tenants diff --git a/debian/control b/debian/control index f80b130..0ce7c45 100644 --- a/debian/control +++ b/debian/control @@ -11,7 +11,8 @@ Architecture: all Depends: ${misc:Depends}, ${python:Depends}, python-django (>= 1.8), python-gadjo, - python-intervaltree + python-intervaltree, + python-requests Recommends: python-django-mellon Description: Agendas System (Python module) diff --git a/requirements.txt b/requirements.txt index 536c4b8..ed6f4ce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ gadjo djangorestframework>=3.1, <3.7 django-jsonfield >= 0.9.3 intervaltree +requests +vobject diff --git a/setup.py b/setup.py index 8fefafb..bfcaa56 100644 --- a/setup.py +++ b/setup.py @@ -107,7 +107,8 @@ setup( 'djangorestframework>=3.1, <3.7', 'django-jsonfield >= 0.9.3', 'intervaltree', - 'vobject' + 'vobject', + 'requests' ], zip_safe=False, cmdclass={ diff --git a/tests/test_agendas.py b/tests/test_agendas.py index 7560a66..e024fe4 100644 --- a/tests/test_agendas.py +++ b/tests/test_agendas.py @@ -1,7 +1,13 @@ import pytest import datetime +import mock +import re +import requests + from django.utils.timezone import now, make_aware, localtime +from django.core.management import call_command +from django.core.management.base import CommandError from chrono.agendas.models import (Agenda, Event, Booking, MeetingType, Desk, TimePeriodException, ICSError) @@ -22,8 +28,8 @@ END:VEVENT BEGIN:VEVENT DTSTAMP:20170824T092855Z UID:950c3ff889d2465dd5d648c4c2194232c0a565f4 -DTSTART:20170831T180800Z -DTEND:20170831T213400Z +DTSTART:20170830T180800Z +DTEND:20170831T223400Z SEQUENCE:2 SUMMARY:Event 2 END:VEVENT @@ -43,7 +49,7 @@ BEGIN:VEVENT DTSTAMP:20170824T082855Z DTSTART:20180101 DTEND:20180101 -SUMMARY:New eve +SUMMARY:New Year's Eve RRULE:FREQ=YEARLY END:VEVENT END:VCALENDAR""" @@ -220,3 +226,107 @@ def test_timeexception_create_from_ics_with_no_events(): with pytest.raises(ICSError) as e: exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS) assert str(e.value) == "The file doesn't contain any events." + +@mock.patch('chrono.agendas.models.requests.get') +def test_timeperiodexception_creation_from_remote_ics(mocked_get): + agenda = Agenda(label=u'Test 8 agenda') + agenda.save() + desk = Desk(label='Test 8 desk', agenda=agenda) + desk.save() + mocked_response = mock.Mock() + mocked_response.text = ICS_SAMPLE + mocked_get.return_value = mocked_response + exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') + assert exceptions_count == 2 + mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE) + mocked_get.return_value = mocked_response + desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') + for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk): + assert 'New summary ' in timeperiod.label + + mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS + mocked_get.return_value = mocked_response + exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') + assert exceptions_count == 0 + TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0 + +@mock.patch('chrono.agendas.models.requests.get') +def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get): + agenda = Agenda(label=u'Test 9 agenda') + agenda.save() + desk = Desk(label='Test 9 desk', agenda=agenda) + desk.save() + mocked_response = mock.Mock() + mocked_response.text = ICS_SAMPLE + mocked_get.return_value = mocked_response + def mocked_requests_connection_error(*args, **kwargs): + raise requests.ConnectionError('unreachable') + mocked_get.side_effect = mocked_requests_connection_error + with pytest.raises(ICSError) as e: + exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') + assert str(e.value) == "Failed to retrieve remote calendar (unreachable)." + +@mock.patch('chrono.agendas.models.requests.get') +def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get): + agenda = Agenda(label=u'Test 10 agenda') + agenda.save() + desk = Desk(label='Test 10 desk', agenda=agenda) + desk.save() + mocked_response = mock.Mock() + mocked_response.status_code = 403 + mocked_get.return_value = mocked_response + def mocked_requests_http_forbidden_error(*args, **kwargs): + raise requests.HTTPError(response=mocked_response) + mocked_get.side_effect = mocked_requests_http_forbidden_error + + with pytest.raises(ICSError) as e: + exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') + assert str(e.value) == "Failed to retrieve remote calendar (HTTP error 403)." + +@mock.patch('chrono.agendas.models.requests.get') +def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, caplog): + agenda = Agenda(label=u'Test 11 agenda') + agenda.save() + desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics') + desk.save() + mocked_response = mock.Mock() + mocked_response.status_code = 403 + mocked_get.return_value = mocked_response + def mocked_requests_http_forbidden_error(*args, **kwargs): + raise requests.HTTPError(response=mocked_response) + mocked_get.side_effect = mocked_requests_http_forbidden_error + with pytest.raises(CommandError) as e: + call_command('sync_desks_timeperiod_exceptions') + assert str(e.value) == 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (HTTP error 403).' + +@mock.patch('chrono.agendas.models.requests.get') +def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog): + agenda = Agenda(label=u'Test 11 agenda') + agenda.save() + desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics') + desk.save() + mocked_response = mock.Mock() + mocked_response.text = ICS_SAMPLE + mocked_get.return_value = mocked_response + call_command('sync_desks_timeperiod_exceptions') + assert TimePeriodException.objects.filter(desk=desk).count() == 2 + mocked_response.text = """BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//foo.bar//EN +BEGIN:VEVENT +DTSTAMP:20180824T082855Z +UID:new-and-unique-uid +DTSTART:20180831T170800Z +DTEND:20180831T203400Z +SUMMARY:Wonderfull event +END:VEVENT +END:VCALENDAR""" + mocked_get.return_value = mocked_response + call_command('sync_desks_timeperiod_exceptions') + assert TimePeriodException.objects.filter(desk=desk).count() == 1 + exception = TimePeriodException.objects.get(desk=desk) + assert exception.external_id == 'new-and-unique-uid' + mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS + mocked_get.return_value = mocked_response + call_command('sync_desks_timeperiod_exceptions') + assert not TimePeriodException.objects.filter(desk=desk).exists() diff --git a/tests/test_manager.py b/tests/test_manager.py index a39190e..18f1f8b 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -3,7 +3,9 @@ from django.contrib.auth.models import User, Group from django.utils.timezone import make_aware, now, localtime import datetime +import mock import pytest +import requests from webtest import TestApp, Upload from chrono.wsgi import application @@ -840,6 +842,10 @@ def test_agenda_import_time_period_exception_from_ics(app, admin_user): resp = app.get('/manage/agendas/%d/' % agenda.pk) assert 'Import exceptions from .ics' in resp.content resp = resp.click('upload') + assert "You can upload a file or specify an address to a remote calendar." in resp + resp = resp.form.submit(status=302) + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') resp.form['ics_file'] = Upload('exceptions.ics', 'invalid content', 'text/calendar') resp = resp.form.submit(status=200) assert 'File format is invalid' in resp.content @@ -849,7 +855,7 @@ PRODID:-//foo.bar//EN BEGIN:VEVENT DTSTART:20180101 DTEND:20180101 -SUMMARY:New eve +SUMMARY:New Year's Eve RRULE:FREQ=YEARLY END:VEVENT END:VCALENDAR""" @@ -861,12 +867,12 @@ VERSION:2.0 PRODID:-//foo.bar//EN BEGIN:VEVENT DTEND:20180101 -SUMMARY:New eve +SUMMARY:New Year's Eve END:VEVENT END:VCALENDAR""" resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_start_date, 'text/calendar') resp = resp.form.submit(status=200) - assert 'Event "New eve" has no start date.' in resp.content + assert 'Event "New Year's Eve" has no start date.' in resp.content ics_with_no_events = """BEGIN:VCALENDAR VERSION:2.0 PRODID:-//foo.bar//EN @@ -881,11 +887,220 @@ PRODID:-//foo.bar//EN BEGIN:VEVENT DTSTART:20180101 DTEND:20180101 -SUMMARY:New eve +SUMMARY:New Year's Eve END:VEVENT END:VCALENDAR""" + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar') resp = resp.form.submit(status=302) - assert TimePeriodException.objects.count() == 1 + assert TimePeriodException.objects.filter(desk=desk).count() == 1 resp = resp.follow() assert 'An exception has been imported.' in resp.content + +@mock.patch('chrono.agendas.models.requests.get') +def test_agenda_import_time_period_exception_with_remote_ics(mocked_get, app, admin_user): + agenda = Agenda.objects.create(label='New Example', kind='meetings') + desk = Desk.objects.create(agenda=agenda, label='New Desk') + MeetingType(agenda=agenda, label='Bar').save() + login(app) + resp = app.get('/manage/agendas/%d/' % agenda.pk) + assert 'Import exceptions from .ics' not in resp.content + + TimePeriod.objects.create(weekday=1, desk=desk, + start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) + + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') + + assert 'ics_file' in resp.form.fields + assert 'ics_url' in resp.form.fields + resp.form['ics_url'] = 'http://example.com/foo.ics' + mocked_response = mock.Mock() + mocked_response.text = """BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//foo.bar//EN +BEGIN:VEVENT +UID:random-event-id +DTSTART:20180101 +DTEND:20180101 +SUMMARY:New Year's Eve +END:VEVENT +END:VCALENDAR""" + mocked_get.return_value = mocked_response + resp = resp.form.submit(status=302) + assert TimePeriodException.objects.filter(desk=desk).count() == 1 + exception = TimePeriodException.objects.get(desk=desk) + assert exception.external_id == 'random-event-id' + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') + resp.form['ics_url'] = '' + resp = resp.form.submit(status=302) + assert not TimePeriodException.objects.filter(desk=desk, + external_id='desk-%s:random-event-id' % desk.id).exists() + +@mock.patch('chrono.agendas.models.requests.get') +def test_agenda_import_time_period_exception_with_remote_ics_no_events(mocked_get, app, admin_user): + agenda = Agenda.objects.create(label='New Example', kind='meetings') + desk = Desk.objects.create(agenda=agenda, label='New Desk') + MeetingType(agenda=agenda, label='Bar').save() + login(app) + resp = app.get('/manage/agendas/%d/' % agenda.pk) + assert 'Import exceptions from .ics' not in resp.content + + TimePeriod.objects.create(weekday=1, desk=desk, + start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) + + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') + resp.form['ics_url'] = 'http://example.com/foo.ics' + mocked_response = mock.Mock() + mocked_response.text = """BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//foo.bar//EN +BEGIN:VEVENT +UID:random-event-id +DTSTART:20180101 +DTEND:20180101 +SUMMARY:New Year's Eve +END:VEVENT +END:VCALENDAR""" + mocked_get.return_value = mocked_response + resp = resp.form.submit(status=302) + assert TimePeriodException.objects.filter(desk=desk).count() == 1 + exception = TimePeriodException.objects.get(desk=desk) + assert exception.external_id == 'random-event-id' + mocked_response.text = """BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//foo.bar//EN +END:VCALENDAR""" + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') + resp = resp.form.submit(status=302) + assert not TimePeriodException.objects.filter(desk=desk, + external_id='random-event-id').exists() + + +@mock.patch('chrono.agendas.models.requests.get') +def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user): + agenda = Agenda.objects.create(label='New Example', kind='meetings') + desk = Desk.objects.create(agenda=agenda, label='New Desk') + MeetingType(agenda=agenda, label='Bar').save() + login(app) + resp = app.get('/manage/agendas/%d/' % agenda.pk) + assert 'Import exceptions from .ics' not in resp.content + + TimePeriod.objects.create(weekday=1, desk=desk, + start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) + + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') + resp.form['ics_url'] = 'http://example.com/foo.ics' + mocked_response = mock.Mock() + mocked_response.text = """BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//foo.bar//EN +BEGIN:VEVENT +UID:first-eventrandom-event-id +DTSTART:20180101 +DTEND:20180101 +SUMMARY:First test event +END:VEVENT +BEGIN:VEVENT +UID:second-eventrandom-event-id +DTSTART:20190101 +DTEND:20190101 +SUMMARY:Second test event +END:VEVENT +END:VCALENDAR""" + mocked_get.return_value = mocked_response + resp = resp.form.submit(status=302) + assert TimePeriodException.objects.filter(desk=desk).count() == 2 + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') + resp.form['ics_url'] = 'http://example.com/foo.ics' + mocked_response.text = """BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//foo.bar//EN +BEGIN:VEVENT +UID:secord-eventrandom-event-id +DTSTART:20190101 +DTEND:20190101 +SUMMARY:Second test event +END:VEVENT +END:VCALENDAR""" + mocked_get.return_value = mocked_response + resp = resp.form.submit(status=302) + assert TimePeriodException.objects.filter(desk=desk).count() == 1 + +@mock.patch('chrono.agendas.models.requests.get') +def test_agenda_import_time_period_exception_from_remote_ics_with_connection_error(mocked_get, app, admin_user): + agenda = Agenda.objects.create(label='New Example', kind='meetings') + desk = Desk.objects.create(agenda=agenda, label='New Desk') + MeetingType(agenda=agenda, label='Bar').save() + login(app) + resp = app.get('/manage/agendas/%d/' % agenda.pk) + assert 'Import exceptions from .ics' not in resp.content + + TimePeriod.objects.create(weekday=1, desk=desk, + start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) + + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') + + assert 'ics_file' in resp.form.fields + assert 'ics_url' in resp.form.fields + resp.form['ics_url'] = 'http://example.com/foo.ics' + mocked_response = mock.Mock() + mocked_get.return_value = mocked_response + def mocked_requests_connection_error(*args, **kwargs): + raise requests.exceptions.ConnectionError('unreachable') + mocked_get.side_effect = mocked_requests_connection_error + resp = resp.form.submit(status=200) + assert 'Failed to retrieve remote calendar (unreachable).' in resp.content + +@mock.patch('chrono.agendas.models.requests.get') +def test_agenda_import_time_period_exception_from_forbidden_remote_ics(mocked_get, app, admin_user): + agenda = Agenda.objects.create(label='New Example', kind='meetings') + desk = Desk.objects.create(agenda=agenda, label='New Desk') + MeetingType(agenda=agenda, label='Bar').save() + login(app) + resp = app.get('/manage/agendas/%d/' % agenda.pk) + assert 'Import exceptions from .ics' not in resp.content + + TimePeriod.objects.create(weekday=1, desk=desk, + start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) + + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') + resp.form['ics_url'] = 'http://example.com/foo.ics' + mocked_response = mock.Mock() + mocked_response.status_code = 403 + mocked_get.return_value = mocked_response + def mocked_requests_http_forbidden_error(*args, **kwargs): + raise requests.exceptions.HTTPError(response=mocked_response) + mocked_get.side_effect = mocked_requests_http_forbidden_error + resp = resp.form.submit(status=200) + assert 'Failed to retrieve remote calendar (HTTP error 403).' in resp.content + +@mock.patch('chrono.agendas.models.requests.get') +def test_agenda_import_time_period_exception_from_remote_ics_with_ssl_error(mocked_get, app, admin_user): + agenda = Agenda.objects.create(label='New Example', kind='meetings') + desk = Desk.objects.create(agenda=agenda, label='New Desk') + MeetingType(agenda=agenda, label='Bar').save() + login(app) + resp = app.get('/manage/agendas/%d/' % agenda.pk) + assert 'Import exceptions from .ics' not in resp.content + TimePeriod.objects.create(weekday=1, desk=desk, + start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) + + resp = app.get('/manage/agendas/%d/' % agenda.pk) + resp = resp.click('upload') + resp.form['ics_url'] = 'https://example.com/foo.ics' + mocked_response = mock.Mock() + mocked_get.return_value = mocked_response + def mocked_requests_http_ssl_error(*args, **kwargs): + raise requests.exceptions.SSLError('SSL error') + mocked_get.side_effect = mocked_requests_http_ssl_error + resp = resp.form.submit(status=200) + assert 'Failed to retrieve remote calendar (SSL error).' in resp.content -- 2.15.0