Projet

Général

Profil

0001-agenda-add-support-for-remote-calendar-file-with-exc.patch

Serghei Mihai, 20 octobre 2017 12:11

Télécharger (31,3 ko)

Voir les différences:

Subject: [PATCH] agenda: add support for remote calendar file with exceptions
 (#19070)

Remote calendars can be specified by desk and updated hourly, or used once.
 chrono/agendas/management/__init__.py              |   0
 chrono/agendas/management/commands/__init__.py     |   0
 .../commands/sync_desks_timeperiod_exceptions.py   |  32 ++++
 .../agendas/migrations/0020_auto_20171019_1729.py  |  32 ++++
 chrono/agendas/models.py                           |  49 +++++-
 chrono/manager/forms.py                            |  11 +-
 .../chrono/manager_import_exceptions.html          |   1 +
 chrono/manager/views.py                            |  22 ++-
 debian/chrono.cron.d                               |   3 +
 debian/control                                     |   3 +-
 requirements.txt                                   |   2 +
 setup.py                                           |   3 +-
 tests/test_agendas.py                              | 108 +++++++++++-
 tests/test_manager.py                              | 190 ++++++++++++++++++++-
 14 files changed, 421 insertions(+), 35 deletions(-)
 create mode 100644 chrono/agendas/management/__init__.py
 create mode 100644 chrono/agendas/management/commands/__init__.py
 create mode 100644 chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py
 create mode 100644 chrono/agendas/migrations/0020_auto_20171019_1729.py
 create mode 100644 debian/chrono.cron.d
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py
1
# chrono - agendas system
2
# Copyright (C) 2016-2017  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import logging
18

  
19
from chrono.agendas.models import Desk, ICSError
20
from django.core.management.base import BaseCommand, CommandError
21

  
22

  
23
class Command(BaseCommand):
24
    help = 'Synchronize time period exceptions from desks remote ics'
25

  
26
    def handle(self, **options):
27
        logger = logging.getLogger(__name__)
28
        for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
29
            try:
30
                desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
31
            except ICSError as e:
32
                raise CommandError(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e))
chrono/agendas/migrations/0020_auto_20171019_1729.py
1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3

  
4
from django.db import migrations, models
5
import datetime
6
from django.utils.timezone import utc
7

  
8

  
9
class Migration(migrations.Migration):
10

  
11
    dependencies = [
12
        ('agendas', '0019_timeperiodexception'),
13
    ]
14

  
15
    operations = [
16
        migrations.AddField(
17
            model_name='desk',
18
            name='timeperiod_exceptions_remote_url',
19
            field=models.URLField(verbose_name='URL to fetch time period exceptions from', blank=True),
20
        ),
21
        migrations.AddField(
22
            model_name='timeperiodexception',
23
            name='creation_date',
24
            field=models.DateTimeField(default=datetime.datetime(2017, 10, 19, 17, 29, 53, 280020, tzinfo=utc), auto_now=True),
25
            preserve_default=False,
26
        ),
27
        migrations.AddField(
28
            model_name='timeperiodexception',
29
            name='external_id',
30
            field=models.CharField(max_length=256, null=True, verbose_name='External ID'),
31
        ),
32
    ]
chrono/agendas/models.py
16 16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 17

  
18 18
import datetime
19
import requests
19 20
import vobject
20 21

  
21 22
from django.contrib.auth.models import Group
......
28 29
from django.utils.encoding import force_text
29 30
from django.utils.formats import date_format, get_format
30 31
from django.utils.text import slugify
31
from django.utils.timezone import localtime, now, make_aware, make_naive
32
from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware
32 33
from django.utils.translation import ugettext_lazy as _
33 34

  
34 35
from jsonfield import JSONField
......
358 359
    agenda = models.ForeignKey(Agenda)
359 360
    label = models.CharField(_('Label'), max_length=150)
360 361
    slug = models.SlugField(_('Identifier'), max_length=150)
362
    timeperiod_exceptions_remote_url = models.URLField(_('URL to fetch time period exceptions from'),
363
                                    blank=True)
361 364

  
362 365
    def __unicode__(self):
363 366
        return self.label
......
417 420
        in_two_weeks = self.get_exceptions_within_two_weeks()
418 421
        return self.timeperiodexception_set.count() == len(in_two_weeks)
419 422

  
420
    def create_timeperiod_exceptions_from_ics(self, data):
423
    def create_timeperiod_exceptions_from_remote_ics(self, url):
424
        try:
425
            response = requests.get(url)
426
            response.raise_for_status()
427
        except requests.HTTPError as e:
428
            raise ICSError(_('Failed to retrieve remote calendar (HTTP error %s).') % e.response.status_code)
429
        except requests.RequestException as e:
430
            raise ICSError(_('Failed to retrieve remote calendar (%s).') % e)
431

  
432
        return self.create_timeperiod_exceptions_from_ics(response.text, store_uid=True)
433

  
434
    def remove_timeperiod_exceptions_from_remote_ics(self):
435
        TimePeriodException.objects.filter(external_id__startswith='desk-%s:' % self.id).delete()
436

  
437
    def create_timeperiod_exceptions_from_ics(self, data, store_uid=False):
421 438
        try:
422 439
            parsed = vobject.readOne(data)
423 440
        except vobject.base.ParseError:
......
426 443
        total_created = 0
427 444

  
428 445
        if not parsed.contents.get('vevent'):
429
            raise ICSError(_('The file doesn\'t contain any events.'))
446
            TimePeriodException.objects.filter(external_id__startswith='desk-%s:' % self.id).delete()
447
            return total_created
430 448

  
431 449
        with transaction.atomic():
450
            update_time = now()
432 451
            for vevent in parsed.contents['vevent']:
433 452
                event = {}
453

  
454
                if store_uid:
455
                    event['external_id'] = 'desk-%s:%s' % (self.id, vevent.contents['uid'][0].value)
434 456
                summary = vevent.contents['summary'][0].value
435 457
                if not isinstance(summary, unicode):
436 458
                    summary = unicode(summary, 'utf-8')
......
441 463
                    if not isinstance(start_dt, datetime.datetime):
442 464
                        start_dt = datetime.datetime.combine(start_dt,
443 465
                                                datetime.datetime.min.time())
444
                    event['start_datetime'] = start_dt
466
                    if not is_aware(start_dt):
467
                        event['start_datetime'] = make_aware(start_dt)
468
                    else:
469
                        event['start_datetime'] = start_dt
445 470
                except AttributeError:
446 471
                    raise ICSError(_('Event "%s" has no start date.') % summary)
447 472
                try:
......
452 477
                except AttributeError:
453 478
                    # events without end date are considered as ending the same day
454 479
                    end_dt = datetime.datetime.combine(start_dt, datetime.datetime.max.time())
455
                event['end_datetime'] = end_dt
456

  
457
                obj, created = TimePeriodException.objects.get_or_create(desk=self, label=summary,
458
                                                                         **event)
480
                if not is_aware(end_dt):
481
                    event['end_datetime'] = make_aware(end_dt)
482
                else:
483
                    event['end_datetime'] = end_dt
484
                obj, created = TimePeriodException.objects.update_or_create(desk=self, label=summary, defaults=event)
485
                # return total_created
459 486
                if created:
460 487
                    total_created += 1
461 488

  
489
            # delete all outdated exceptions from remote calendar
490
            TimePeriodException.objects.filter(creation_date__lt=update_time,
491
                                external_id__startswith='desk-%s:' % self.id).delete()
492

  
462 493
        return total_created
463 494

  
464 495

  
465 496
class TimePeriodException(models.Model):
466 497
    desk = models.ForeignKey(Desk)
498
    external_id = models.CharField(_('External ID'), max_length=256, null=True)
467 499
    label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
468 500
    start_datetime = models.DateTimeField(_('Exception start time'))
469 501
    end_datetime = models.DateTimeField(_('Exception end time'))
502
    creation_date = models.DateTimeField(auto_now=True)
470 503

  
471 504
    class Meta:
472 505
        ordering = ['start_datetime']
chrono/manager/forms.py
16 16

  
17 17
import csv
18 18
import datetime
19
import requests
19 20

  
20 21
from django import forms
21 22
from django.forms import ValidationError
22 23
from django.utils.translation import ugettext_lazy as _
23 24

  
24 25
from chrono.agendas.models import (Event, MeetingType, TimePeriod, Desk,
25
                                   TimePeriodException)
26
                                   TimePeriodException, ICSError)
26 27

  
27 28
from . import widgets
28 29

  
......
83 84
        widgets = {
84 85
            'agenda': forms.HiddenInput(),
85 86
        }
86
        exclude = ['slug']
87
        exclude = ['slug', 'timeperiod_exceptions_remote_url']
87 88

  
88 89

  
89 90
class DeskForm(forms.ModelForm):
......
92 93
        widgets = {
93 94
            'agenda': forms.HiddenInput(),
94 95
        }
95
        exclude = []
96
        exclude = ['timeperiod_exceptions_remote_url']
96 97

  
97 98

  
98 99
class TimePeriodExceptionForm(forms.ModelForm):
......
170 171
        model = Desk
171 172
        fields = []
172 173

  
173
    ics_file = forms.FileField(label=_('ICS File'),
174
    ics_file = forms.FileField(label=_('ICS File'), required=False,
174 175
                               help_text=_('ICS file containing events which will be considered as exceptions'))
176
    ics_url = forms.URLField(label=_('URL'), required=False,
177
                               help_text=_('URL to remote calendar which will be synchronised hourly'))
chrono/manager/templates/chrono/manager_import_exceptions.html
13 13
{% block content %}
14 14

  
15 15
<form method="post" enctype="multipart/form-data">
16
  <p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p>
16 17
  {% csrf_token %}
17 18
  {{ form.as_p }}
18 19
  <p>
chrono/manager/views.py
394 394
    form_class = ExceptionsImportForm
395 395
    template_name = 'chrono/manager_import_exceptions.html'
396 396

  
397
    def get_initial(self):
398
        return {'ics_url': self.get_object().timeperiod_exceptions_remote_url}
399

  
397 400
    def form_valid(self, form):
401
        exceptions = None
398 402
        try:
399
            exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file'])
403
            if form.cleaned_data['ics_file']:
404
                exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file'])
405
            elif form.cleaned_data['ics_url']:
406
                exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
407
            else:
408
                form.instance.remove_timeperiod_exceptions_from_remote_ics()
400 409
        except ICSError as e:
401 410
            form.add_error(None, unicode(e))
402 411
            return self.form_invalid(form)
403
        message = ungettext('An exception has been imported.',
404
                            '%(count)d exceptions have been imported.', exceptions)
405
        message = message % {'count': exceptions}
406
        messages.info(self.request, message)
412
        form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url']
413
        form.instance.save()
414
        if exceptions is not None:
415
            message = ungettext('An exception has been imported.',
416
                                '%(count)d exceptions have been imported.', exceptions)
417
            message = message % {'count': exceptions}
418
            messages.info(self.request, message)
407 419
        return super(DeskImportTimePeriodExceptionsView, self).form_valid(form)
408 420

  
409 421
desk_import_time_period_exceptions = DeskImportTimePeriodExceptionsView.as_view()
debian/chrono.cron.d
1
MAILTO=root
2

  
3
0 * * * * /sbin/runuser -u chrono /usr/bin/chrono-manage -- tenant_command sync_desks_timeperiod_exceptions --all-tenants
debian/control
11 11
Depends: ${misc:Depends}, ${python:Depends},
12 12
    python-django (>= 1.8),
13 13
    python-gadjo,
14
    python-intervaltree
14
    python-intervaltree,
15
    python-requests
15 16
Recommends: python-django-mellon
16 17
Description: Agendas System (Python module)
17 18

  
requirements.txt
3 3
djangorestframework>=3.1, <3.7
4 4
django-jsonfield >= 0.9.3
5 5
intervaltree
6
requests
7
vobject
setup.py
107 107
        'djangorestframework>=3.1, <3.7',
108 108
        'django-jsonfield >= 0.9.3',
109 109
        'intervaltree',
110
        'vobject'
110
        'vobject',
111
        'requests'
111 112
        ],
112 113
    zip_safe=False,
113 114
    cmdclass={
tests/test_agendas.py
1 1
import pytest
2 2
import datetime
3
import logging
4
import mock
5
import requests
6

  
3 7

  
4 8
from django.utils.timezone import now, make_aware, localtime
9
from django.core.management import call_command
10
from django.core.management.base import CommandError
5 11

  
6 12
from chrono.agendas.models import (Agenda, Event, Booking, MeetingType,
7 13
                        Desk, TimePeriodException, ICSError)
......
22 28
BEGIN:VEVENT
23 29
DTSTAMP:20170824T092855Z
24 30
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
25
DTSTART:20170831T180800Z
26
DTEND:20170831T213400Z
31
DTSTART:20170830T180800Z
32
DTEND:20170831T223400Z
27 33
SEQUENCE:2
28 34
SUMMARY:Event 2
29 35
END:VEVENT
......
43 49
DTSTAMP:20170824T082855Z
44 50
DTSTART:20180101
45 51
DTEND:20180101
46
SUMMARY:New eve
52
SUMMARY:New Year's Eve
47 53
RRULE:FREQ=YEARLY
48 54
END:VEVENT
49 55
END:VCALENDAR"""
......
212 218
        exceptions_count = desk.create_timeperiod_exceptions_from_ics(INVALID_ICS_SAMPLE)
213 219
    assert str(e.value) == 'File format is invalid.'
214 220

  
215
def test_timeexception_create_from_ics_with_no_events():
216
    agenda = Agenda(label=u'Test 7 agenda')
221
@mock.patch('chrono.agendas.models.requests.get')
222
def test_timeperiodexception_creation_from_remote_ics(mocked_get):
223
    agenda = Agenda(label=u'Test 8 agenda')
224
    agenda.save()
225
    desk = Desk(label='Test 8 desk', agenda=agenda)
226
    desk.save()
227
    mocked_response = mock.Mock()
228
    mocked_response.text = ICS_SAMPLE
229
    mocked_get.return_value = mocked_response
230
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
231
    assert exceptions_count == 2
232
    mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
233
    mocked_get.return_value = mocked_response
234
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
235
    assert exceptions_count == 0
236
    TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0
237

  
238
@mock.patch('chrono.agendas.models.requests.get')
239
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
240
    agenda = Agenda(label=u'Test 9 agenda')
217 241
    agenda.save()
218
    desk = Desk(label='Test 7 desk', agenda=agenda)
242
    desk = Desk(label='Test 9 desk', agenda=agenda)
219 243
    desk.save()
244
    mocked_response = mock.Mock()
245
    mocked_response.text = ICS_SAMPLE
246
    mocked_get.return_value = mocked_response
247
    def mocked_requests_connection_error(*args, **kwargs):
248
        raise requests.ConnectionError('unreachable')
249
    mocked_get.side_effect = mocked_requests_connection_error
220 250
    with pytest.raises(ICSError) as e:
221
        exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
222
    assert str(e.value) == "The file doesn't contain any events."
251
        exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
252
    assert str(e.value) == "Failed to retrieve remote calendar (unreachable)."
253

  
254
@mock.patch('chrono.agendas.models.requests.get')
255
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
256
    agenda = Agenda(label=u'Test 10 agenda')
257
    agenda.save()
258
    desk = Desk(label='Test 10 desk', agenda=agenda)
259
    desk.save()
260
    mocked_response = mock.Mock()
261
    mocked_response.status_code = 403
262
    mocked_get.return_value = mocked_response
263
    def mocked_requests_http_forbidden_error(*args, **kwargs):
264
        raise requests.HTTPError(response=mocked_response)
265
    mocked_get.side_effect = mocked_requests_http_forbidden_error
266

  
267
    with pytest.raises(ICSError) as e:
268
        exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
269
    assert str(e.value) == "Failed to retrieve remote calendar (HTTP error 403)."
270

  
271
@mock.patch('chrono.agendas.models.requests.get')
272
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, caplog):
273
    agenda = Agenda(label=u'Test 11 agenda')
274
    agenda.save()
275
    desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
276
    desk.save()
277
    mocked_response = mock.Mock()
278
    mocked_response.status_code = 403
279
    mocked_get.return_value = mocked_response
280
    def mocked_requests_http_forbidden_error(*args, **kwargs):
281
        raise requests.HTTPError(response=mocked_response)
282
    mocked_get.side_effect = mocked_requests_http_forbidden_error
283
    with pytest.raises(CommandError) as e:
284
        call_command('sync_desks_timeperiod_exceptions')
285
    assert str(e.value) == 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (HTTP error 403).'
286

  
287
@mock.patch('chrono.agendas.models.requests.get')
288
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
289
    agenda = Agenda(label=u'Test 11 agenda')
290
    agenda.save()
291
    desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
292
    desk.save()
293
    mocked_response = mock.Mock()
294
    mocked_response.text = ICS_SAMPLE
295
    mocked_get.return_value = mocked_response
296
    call_command('sync_desks_timeperiod_exceptions')
297
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
298
    mocked_response = mock.Mock()
299
    mocked_response.text = """BEGIN:VCALENDAR
300
VERSION:2.0
301
PRODID:-//foo.bar//EN
302
BEGIN:VEVENT
303
DTSTAMP:20180824T082855Z
304
UID:new-and-unique-uid
305
DTSTART:20180831T170800Z
306
DTEND:20180831T203400Z
307
SUMMARY:Wonderfull event
308
END:VEVENT
309
END:VCALENDAR"""
310
    mocked_get.return_value = mocked_response
311
    call_command('sync_desks_timeperiod_exceptions')
312
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
313
    exception = TimePeriodException.objects.get(desk=desk)
314
    assert exception.external_id == 'desk-%s:new-and-unique-uid' % desk.id
tests/test_manager.py
3 3
from django.contrib.auth.models import User, Group
4 4
from django.utils.timezone import make_aware, now, localtime
5 5
import datetime
6
import mock
6 7
import pytest
8
import requests
7 9
from webtest import TestApp, Upload
8 10

  
9 11
from chrono.wsgi import application
......
699 701
    assert 'Desk A' in resp.text
700 702
    assert 'Desk B' in resp.text
701 703

  
702

  
703 704
def test_meetings_agenda_delete_desk(app, admin_user):
704 705
    app = login(app)
705 706
    resp = app.get('/manage/', status=200)
......
840 841
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
841 842
    assert 'Import exceptions from .ics' in resp.content
842 843
    resp = resp.click('upload')
844
    assert "You can upload a file or specify an address to a remote calendar." in resp
845
    resp = resp.form.submit(status=302)
846
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
847
    resp = resp.click('upload')
843 848
    resp.form['ics_file'] = Upload('exceptions.ics', 'invalid content', 'text/calendar')
844 849
    resp = resp.form.submit(status=200)
845 850
    assert 'File format is invalid' in resp.content
......
849 854
BEGIN:VEVENT
850 855
DTSTART:20180101
851 856
DTEND:20180101
852
SUMMARY:New eve
857
SUMMARY:New Year's Eve
853 858
RRULE:FREQ=YEARLY
854 859
END:VEVENT
855 860
END:VCALENDAR"""
......
861 866
PRODID:-//foo.bar//EN
862 867
BEGIN:VEVENT
863 868
DTEND:20180101
864
SUMMARY:New eve
869
SUMMARY:New Year's Eve
865 870
END:VEVENT
866 871
END:VCALENDAR"""
867 872
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_start_date, 'text/calendar')
868 873
    resp = resp.form.submit(status=200)
869
    assert 'Event &quot;New eve&quot; has no start date.' in resp.content
874
    assert 'Event &quot;New Year&#39;s Eve&quot; has no start date.' in resp.content
870 875
    ics_with_no_events = """BEGIN:VCALENDAR
871 876
VERSION:2.0
872 877
PRODID:-//foo.bar//EN
873 878
END:VCALENDAR"""
874 879
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_events, 'text/calendar')
875
    resp = resp.form.submit(status=200)
876
    assert "The file doesn&#39;t contain any events." in resp.content
880
    resp = resp.form.submit(status=302)
881
    resp = resp.follow()
882
    assert "0 exceptions have been imported." in resp.content
877 883

  
878 884
    ics_with_exceptions = """BEGIN:VCALENDAR
879 885
VERSION:2.0
......
881 887
BEGIN:VEVENT
882 888
DTSTART:20180101
883 889
DTEND:20180101
884
SUMMARY:New eve
890
SUMMARY:New Year's Eve
885 891
END:VEVENT
886 892
END:VCALENDAR"""
893
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
894
    resp = resp.click('upload')
887 895
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
888 896
    resp = resp.form.submit(status=302)
889
    assert TimePeriodException.objects.count() == 1
897
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
890 898
    resp = resp.follow()
891 899
    assert 'An exception has been imported.' in resp.content
900

  
901
@mock.patch('chrono.agendas.models.requests.get')
902
def test_agenda_import_time_period_exception_with_remote_ics(mocked_get, app, admin_user):
903
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
904
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
905
    MeetingType(agenda=agenda, label='Bar').save()
906
    login(app)
907
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
908
    assert 'Import exceptions from .ics' not in resp.content
909

  
910
    TimePeriod.objects.create(weekday=1, desk=desk,
911
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
912

  
913
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
914
    resp = resp.click('upload')
915

  
916
    assert 'ics_file' in resp.form.fields
917
    assert 'ics_url' in resp.form.fields
918
    resp.form['ics_url'] = 'http://example.com/foo.ics'
919
    mocked_response = mock.Mock()
920
    mocked_response.text =  """BEGIN:VCALENDAR
921
VERSION:2.0
922
PRODID:-//foo.bar//EN
923
BEGIN:VEVENT
924
UID:random-event-id
925
DTSTART:20180101
926
DTEND:20180101
927
SUMMARY:New Year's Eve
928
END:VEVENT
929
END:VCALENDAR"""
930
    mocked_get.return_value = mocked_response
931
    resp = resp.form.submit(status=302)
932
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
933
    exception = TimePeriodException.objects.get(desk=desk)
934
    assert exception.external_id == 'desk-%s:random-event-id' % desk.id
935
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
936
    resp = resp.click('upload')
937
    resp.form['ics_url'] = ''
938
    resp = resp.form.submit(status=302)
939
    assert TimePeriodException.objects.filter(desk=desk,
940
                    external_id='desk-%s:random-event-id' % desk.id).count() == 0
941

  
942

  
943
@mock.patch('chrono.agendas.models.requests.get')
944
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user):
945
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
946
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
947
    MeetingType(agenda=agenda, label='Bar').save()
948
    login(app)
949
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
950
    assert 'Import exceptions from .ics' not in resp.content
951

  
952
    TimePeriod.objects.create(weekday=1, desk=desk,
953
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
954

  
955
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
956
    resp = resp.click('upload')
957
    resp.form['ics_url'] = 'http://example.com/foo.ics'
958
    mocked_response = mock.Mock()
959
    mocked_response.text =  """BEGIN:VCALENDAR
960
VERSION:2.0
961
PRODID:-//foo.bar//EN
962
BEGIN:VEVENT
963
UID:first-eventrandom-event-id
964
DTSTART:20180101
965
DTEND:20180101
966
SUMMARY:First test event
967
END:VEVENT
968
BEGIN:VEVENT
969
UID:second-eventrandom-event-id
970
DTSTART:20190101
971
DTEND:20190101
972
SUMMARY:Second test event
973
END:VEVENT
974
END:VCALENDAR"""
975
    mocked_get.return_value = mocked_response
976
    resp = resp.form.submit(status=302)
977
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
978
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
979
    resp = resp.click('upload')
980
    resp.form['ics_url'] = 'http://example.com/foo.ics'
981
    mocked_response.text = """BEGIN:VCALENDAR
982
VERSION:2.0
983
PRODID:-//foo.bar//EN
984
BEGIN:VEVENT
985
UID:secord-eventrandom-event-id
986
DTSTART:20190101
987
DTEND:20190101
988
SUMMARY:Second test event
989
END:VEVENT
990
END:VCALENDAR"""
991
    mocked_get.return_value = mocked_response
992
    resp = resp.form.submit(status=302)
993
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
994

  
995
@mock.patch('chrono.agendas.models.requests.get')
996
def test_agenda_import_time_period_exception_from_remote_ics_with_connection_error(mocked_get, app, admin_user):
997
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
998
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
999
    MeetingType(agenda=agenda, label='Bar').save()
1000
    login(app)
1001
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1002
    assert 'Import exceptions from .ics' not in resp.content
1003

  
1004
    TimePeriod.objects.create(weekday=1, desk=desk,
1005
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
1006

  
1007
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1008
    resp = resp.click('upload')
1009

  
1010
    assert 'ics_file' in resp.form.fields
1011
    assert 'ics_url' in resp.form.fields
1012
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1013
    mocked_response = mock.Mock()
1014
    mocked_get.return_value = mocked_response
1015
    def mocked_requests_connection_error(*args, **kwargs):
1016
        raise requests.exceptions.ConnectionError('unreachable')
1017
    mocked_get.side_effect = mocked_requests_connection_error
1018
    resp = resp.form.submit(status=200)
1019
    assert 'Failed to retrieve remote calendar (unreachable).' in resp.content
1020

  
1021
@mock.patch('chrono.agendas.models.requests.get')
1022
def test_agenda_import_time_period_exception_from_forbidden_remote_ics(mocked_get, app, admin_user):
1023
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
1024
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
1025
    MeetingType(agenda=agenda, label='Bar').save()
1026
    login(app)
1027
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1028
    assert 'Import exceptions from .ics' not in resp.content
1029

  
1030
    TimePeriod.objects.create(weekday=1, desk=desk,
1031
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
1032

  
1033
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1034
    resp = resp.click('upload')
1035
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1036
    mocked_response = mock.Mock()
1037
    mocked_response.status_code = 403
1038
    mocked_get.return_value = mocked_response
1039
    def mocked_requests_http_forbidden_error(*args, **kwargs):
1040
        raise requests.exceptions.HTTPError(response=mocked_response)
1041
    mocked_get.side_effect = mocked_requests_http_forbidden_error
1042
    resp = resp.form.submit(status=200)
1043
    assert 'Failed to retrieve remote calendar (HTTP error 403).' in resp.content
1044

  
1045
@mock.patch('chrono.agendas.models.requests.get')
1046
def test_agenda_import_time_period_exception_from_remote_ics_with_ssl_error(mocked_get, app, admin_user):
1047
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
1048
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
1049
    MeetingType(agenda=agenda, label='Bar').save()
1050
    login(app)
1051
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1052
    assert 'Import exceptions from .ics' not in resp.content
1053
    TimePeriod.objects.create(weekday=1, desk=desk,
1054
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
1055

  
1056
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1057
    resp = resp.click('upload')
1058
    resp.form['ics_url'] = 'https://example.com/foo.ics'
1059
    mocked_response = mock.Mock()
1060
    mocked_get.return_value = mocked_response
1061
    def mocked_requests_http_ssl_error(*args, **kwargs):
1062
        raise requests.exceptions.SSLError('SSL error')
1063
    mocked_get.side_effect = mocked_requests_http_ssl_error
1064
    resp = resp.form.submit(status=200)
1065
    assert 'Failed to retrieve remote calendar (SSL error).' in resp.content
892
-