Projet

Général

Profil

0001-agenda-add-support-for-remote-calendar-file-with-exc.patch

Serghei Mihai, 27 octobre 2017 13:49

Télécharger (33,3 ko)

Voir les différences:

Subject: [PATCH] agenda: add support for remote calendar file with exceptions
 (#19070)

Remote calendars can be specified by desk and updated hourly, or used once.
 chrono/agendas/management/__init__.py              |   0
 chrono/agendas/management/commands/__init__.py     |   0
 .../commands/sync_desks_timeperiod_exceptions.py   |  32 +++
 .../agendas/migrations/0020_auto_20171019_1729.py  |  32 +++
 chrono/agendas/models.py                           |  56 ++++-
 chrono/manager/forms.py                            |  11 +-
 .../chrono/manager_import_exceptions.html          |   1 +
 chrono/manager/views.py                            |  22 +-
 debian/chrono.cron.d                               |   3 +
 debian/control                                     |   3 +-
 requirements.txt                                   |   2 +
 setup.py                                           |   3 +-
 tests/test_agendas.py                              | 117 ++++++++++-
 tests/test_manager.py                              | 226 ++++++++++++++++++++-
 14 files changed, 480 insertions(+), 28 deletions(-)
 create mode 100644 chrono/agendas/management/__init__.py
 create mode 100644 chrono/agendas/management/commands/__init__.py
 create mode 100644 chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py
 create mode 100644 chrono/agendas/migrations/0020_auto_20171019_1729.py
 create mode 100644 debian/chrono.cron.d
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py
1
# chrono - agendas system
2
# Copyright (C) 2016-2017  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import logging
18

  
19
from chrono.agendas.models import Desk, ICSError
20
from django.core.management.base import BaseCommand, CommandError
21

  
22

  
23
class Command(BaseCommand):
24
    help = 'Synchronize time period exceptions from desks remote ics'
25

  
26
    def handle(self, **options):
27
        logger = logging.getLogger(__name__)
28
        for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
29
            try:
30
                desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
31
            except ICSError as e:
32
                raise CommandError(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e))
chrono/agendas/migrations/0020_auto_20171019_1729.py
1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3

  
4
from django.db import migrations, models
5
import datetime
6
from django.utils.timezone import utc
7

  
8

  
9
class Migration(migrations.Migration):
10

  
11
    dependencies = [
12
        ('agendas', '0019_timeperiodexception'),
13
    ]
14

  
15
    operations = [
16
        migrations.AddField(
17
            model_name='desk',
18
            name='timeperiod_exceptions_remote_url',
19
            field=models.URLField(verbose_name='URL to fetch time period exceptions from', blank=True),
20
        ),
21
        migrations.AddField(
22
            model_name='timeperiodexception',
23
            name='creation_date',
24
            field=models.DateTimeField(default=datetime.datetime(2017, 10, 19, 17, 29, 53, 280020, tzinfo=utc), auto_now=True),
25
            preserve_default=False,
26
        ),
27
        migrations.AddField(
28
            model_name='timeperiodexception',
29
            name='external_id',
30
            field=models.CharField(max_length=256, null=True, verbose_name='External ID'),
31
        ),
32
    ]
chrono/agendas/models.py
16 16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 17

  
18 18
import datetime
19
import requests
19 20
import vobject
20 21

  
21 22
from django.contrib.auth.models import Group
......
28 29
from django.utils.encoding import force_text
29 30
from django.utils.formats import date_format, get_format
30 31
from django.utils.text import slugify
31
from django.utils.timezone import localtime, now, make_aware, make_naive
32
from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware
32 33
from django.utils.translation import ugettext_lazy as _
33 34

  
34 35
from jsonfield import JSONField
......
358 359
    agenda = models.ForeignKey(Agenda)
359 360
    label = models.CharField(_('Label'), max_length=150)
360 361
    slug = models.SlugField(_('Identifier'), max_length=150)
362
    timeperiod_exceptions_remote_url = models.URLField(_('URL to fetch time period exceptions from'),
363
                                    blank=True)
361 364

  
362 365
    def __unicode__(self):
363 366
        return self.label
......
417 420
        in_two_weeks = self.get_exceptions_within_two_weeks()
418 421
        return self.timeperiodexception_set.count() == len(in_two_weeks)
419 422

  
420
    def create_timeperiod_exceptions_from_ics(self, data):
423
    def create_timeperiod_exceptions_from_remote_ics(self, url):
424
        try:
425
            response = requests.get(url)
426
            response.raise_for_status()
427
        except requests.HTTPError as e:
428
            raise ICSError(_('Failed to retrieve remote calendar (HTTP error %s).') % e.response.status_code)
429
        except requests.RequestException as e:
430
            raise ICSError(_('Failed to retrieve remote calendar (%s).') % e)
431

  
432
        return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True)
433

  
434
    def remove_timeperiod_exceptions_from_remote_ics(self):
435
        TimePeriodException.objects.filter(external_id__startswith='desk-%s:' % self.id).delete()
436

  
437
    def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False):
421 438
        try:
422 439
            parsed = vobject.readOne(data)
423 440
        except vobject.base.ParseError:
......
426 443
        total_created = 0
427 444

  
428 445
        if not parsed.contents.get('vevent'):
429
            raise ICSError(_('The file doesn\'t contain any events.'))
446
            if keep_synced_by_uid:
447
                TimePeriodException.objects.filter(external_id__startswith='desk-%s:' % self.id).delete()
448
                return total_created
449
            else:
450
                raise ICSError(_('The file doesn\'t contain any events.'))
430 451

  
431 452
        with transaction.atomic():
453
            update_time = now()
432 454
            for vevent in parsed.contents['vevent']:
433 455
                event = {}
456

  
434 457
                summary = vevent.contents['summary'][0].value
435 458
                if not isinstance(summary, unicode):
436 459
                    summary = unicode(summary, 'utf-8')
......
441 464
                    if not isinstance(start_dt, datetime.datetime):
442 465
                        start_dt = datetime.datetime.combine(start_dt,
443 466
                                                datetime.datetime.min.time())
444
                    event['start_datetime'] = start_dt
467
                    if not is_aware(start_dt):
468
                        event['start_datetime'] = make_aware(start_dt)
469
                    else:
470
                        event['start_datetime'] = start_dt
445 471
                except AttributeError:
446 472
                    raise ICSError(_('Event "%s" has no start date.') % summary)
447 473
                try:
......
452 478
                except AttributeError:
453 479
                    # events without end date are considered as ending the same day
454 480
                    end_dt = datetime.datetime.combine(start_dt, datetime.datetime.max.time())
455
                event['end_datetime'] = end_dt
456

  
457
                obj, created = TimePeriodException.objects.get_or_create(desk=self, label=summary,
458
                                                                         **event)
481
                if not is_aware(end_dt):
482
                    event['end_datetime'] = make_aware(end_dt)
483
                else:
484
                    event['end_datetime'] = end_dt
485
                if keep_synced_by_uid:
486
                    external_id = 'desk-%s:%s' % (self.id, vevent.contents['uid'][0].value)
487
                    event['label'] = summary
488
                    obj, created = TimePeriodException.objects.update_or_create(desk=self, external_id=external_id,
489
                                                                    defaults=event)
490
                else:
491
                    obj, created = TimePeriodException.objects.update_or_create(desk=self, label=summary, defaults=event)
492
                # return total_created
459 493
                if created:
460 494
                    total_created += 1
461 495

  
496
            # delete all outdated exceptions from remote calendar
497
            TimePeriodException.objects.filter(creation_date__lt=update_time,
498
                                external_id__startswith='desk-%s:' % self.id).delete()
499

  
462 500
        return total_created
463 501

  
464 502

  
465 503
class TimePeriodException(models.Model):
466 504
    desk = models.ForeignKey(Desk)
505
    external_id = models.CharField(_('External ID'), max_length=256, null=True)
467 506
    label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
468 507
    start_datetime = models.DateTimeField(_('Exception start time'))
469 508
    end_datetime = models.DateTimeField(_('Exception end time'))
509
    creation_date = models.DateTimeField(auto_now=True)
470 510

  
471 511
    class Meta:
472 512
        ordering = ['start_datetime']
chrono/manager/forms.py
16 16

  
17 17
import csv
18 18
import datetime
19
import requests
19 20

  
20 21
from django import forms
21 22
from django.forms import ValidationError
22 23
from django.utils.translation import ugettext_lazy as _
23 24

  
24 25
from chrono.agendas.models import (Event, MeetingType, TimePeriod, Desk,
25
                                   TimePeriodException)
26
                                   TimePeriodException, ICSError)
26 27

  
27 28
from . import widgets
28 29

  
......
83 84
        widgets = {
84 85
            'agenda': forms.HiddenInput(),
85 86
        }
86
        exclude = ['slug']
87
        exclude = ['slug', 'timeperiod_exceptions_remote_url']
87 88

  
88 89

  
89 90
class DeskForm(forms.ModelForm):
......
92 93
        widgets = {
93 94
            'agenda': forms.HiddenInput(),
94 95
        }
95
        exclude = []
96
        exclude = ['timeperiod_exceptions_remote_url']
96 97

  
97 98

  
98 99
class TimePeriodExceptionForm(forms.ModelForm):
......
170 171
        model = Desk
171 172
        fields = []
172 173

  
173
    ics_file = forms.FileField(label=_('ICS File'),
174
    ics_file = forms.FileField(label=_('ICS File'), required=False,
174 175
                               help_text=_('ICS file containing events which will be considered as exceptions'))
176
    ics_url = forms.URLField(label=_('URL'), required=False,
177
                               help_text=_('URL to remote calendar which will be synchronised hourly'))
chrono/manager/templates/chrono/manager_import_exceptions.html
13 13
{% block content %}
14 14

  
15 15
<form method="post" enctype="multipart/form-data">
16
  <p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p>
16 17
  {% csrf_token %}
17 18
  {{ form.as_p }}
18 19
  <p>
chrono/manager/views.py
394 394
    form_class = ExceptionsImportForm
395 395
    template_name = 'chrono/manager_import_exceptions.html'
396 396

  
397
    def get_initial(self):
398
        return {'ics_url': self.get_object().timeperiod_exceptions_remote_url}
399

  
397 400
    def form_valid(self, form):
401
        exceptions = None
398 402
        try:
399
            exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file'])
403
            if form.cleaned_data['ics_file']:
404
                exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file'])
405
            elif form.cleaned_data['ics_url']:
406
                exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
407
            else:
408
                form.instance.remove_timeperiod_exceptions_from_remote_ics()
400 409
        except ICSError as e:
401 410
            form.add_error(None, unicode(e))
402 411
            return self.form_invalid(form)
403
        message = ungettext('An exception has been imported.',
404
                            '%(count)d exceptions have been imported.', exceptions)
405
        message = message % {'count': exceptions}
406
        messages.info(self.request, message)
412
        form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url']
413
        form.instance.save()
414
        if exceptions is not None:
415
            message = ungettext('An exception has been imported.',
416
                                '%(count)d exceptions have been imported.', exceptions)
417
            message = message % {'count': exceptions}
418
            messages.info(self.request, message)
407 419
        return super(DeskImportTimePeriodExceptionsView, self).form_valid(form)
408 420

  
409 421
desk_import_time_period_exceptions = DeskImportTimePeriodExceptionsView.as_view()
debian/chrono.cron.d
1
MAILTO=root
2

  
3
0 * * * * /sbin/runuser -u chrono /usr/bin/chrono-manage -- tenant_command sync_desks_timeperiod_exceptions --all-tenants
debian/control
11 11
Depends: ${misc:Depends}, ${python:Depends},
12 12
    python-django (>= 1.8),
13 13
    python-gadjo,
14
    python-intervaltree
14
    python-intervaltree,
15
    python-requests
15 16
Recommends: python-django-mellon
16 17
Description: Agendas System (Python module)
17 18

  
requirements.txt
3 3
djangorestframework>=3.1, <3.7
4 4
django-jsonfield >= 0.9.3
5 5
intervaltree
6
requests
7
vobject
setup.py
107 107
        'djangorestframework>=3.1, <3.7',
108 108
        'django-jsonfield >= 0.9.3',
109 109
        'intervaltree',
110
        'vobject'
110
        'vobject',
111
        'requests'
111 112
        ],
112 113
    zip_safe=False,
113 114
    cmdclass={
tests/test_agendas.py
1 1
import pytest
2 2
import datetime
3
import logging
4
import mock
5
import re
6
import requests
7

  
3 8

  
4 9
from django.utils.timezone import now, make_aware, localtime
10
from django.core.management import call_command
11
from django.core.management.base import CommandError
5 12

  
6 13
from chrono.agendas.models import (Agenda, Event, Booking, MeetingType,
7 14
                        Desk, TimePeriodException, ICSError)
......
22 29
BEGIN:VEVENT
23 30
DTSTAMP:20170824T092855Z
24 31
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
25
DTSTART:20170831T180800Z
26
DTEND:20170831T213400Z
32
DTSTART:20170830T180800Z
33
DTEND:20170831T223400Z
27 34
SEQUENCE:2
28 35
SUMMARY:Event 2
29 36
END:VEVENT
......
43 50
DTSTAMP:20170824T082855Z
44 51
DTSTART:20180101
45 52
DTEND:20180101
46
SUMMARY:New eve
53
SUMMARY:New Year's Eve
47 54
RRULE:FREQ=YEARLY
48 55
END:VEVENT
49 56
END:VCALENDAR"""
......
220 227
    with pytest.raises(ICSError) as e:
221 228
        exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
222 229
    assert str(e.value) == "The file doesn't contain any events."
230

  
231
@mock.patch('chrono.agendas.models.requests.get')
232
def test_timeperiodexception_creation_from_remote_ics(mocked_get):
233
    agenda = Agenda(label=u'Test 8 agenda')
234
    agenda.save()
235
    desk = Desk(label='Test 8 desk', agenda=agenda)
236
    desk.save()
237
    mocked_response = mock.Mock()
238
    mocked_response.text = ICS_SAMPLE
239
    mocked_get.return_value = mocked_response
240
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
241
    assert exceptions_count == 2
242
    mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE)
243
    mocked_get.return_value = mocked_response
244
    desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
245
    for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk):
246
        assert 'New summary ' in timeperiod.label
247

  
248
    mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
249
    mocked_get.return_value = mocked_response
250
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
251
    assert exceptions_count == 0
252
    TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0
253

  
254
@mock.patch('chrono.agendas.models.requests.get')
255
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get):
256
    agenda = Agenda(label=u'Test 9 agenda')
257
    agenda.save()
258
    desk = Desk(label='Test 9 desk', agenda=agenda)
259
    desk.save()
260
    mocked_response = mock.Mock()
261
    mocked_response.text = ICS_SAMPLE
262
    mocked_get.return_value = mocked_response
263
    def mocked_requests_connection_error(*args, **kwargs):
264
        raise requests.ConnectionError('unreachable')
265
    mocked_get.side_effect = mocked_requests_connection_error
266
    with pytest.raises(ICSError) as e:
267
        exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
268
    assert str(e.value) == "Failed to retrieve remote calendar (unreachable)."
269

  
270
@mock.patch('chrono.agendas.models.requests.get')
271
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get):
272
    agenda = Agenda(label=u'Test 10 agenda')
273
    agenda.save()
274
    desk = Desk(label='Test 10 desk', agenda=agenda)
275
    desk.save()
276
    mocked_response = mock.Mock()
277
    mocked_response.status_code = 403
278
    mocked_get.return_value = mocked_response
279
    def mocked_requests_http_forbidden_error(*args, **kwargs):
280
        raise requests.HTTPError(response=mocked_response)
281
    mocked_get.side_effect = mocked_requests_http_forbidden_error
282

  
283
    with pytest.raises(ICSError) as e:
284
        exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
285
    assert str(e.value) == "Failed to retrieve remote calendar (HTTP error 403)."
286

  
287
@mock.patch('chrono.agendas.models.requests.get')
288
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, caplog):
289
    agenda = Agenda(label=u'Test 11 agenda')
290
    agenda.save()
291
    desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
292
    desk.save()
293
    mocked_response = mock.Mock()
294
    mocked_response.status_code = 403
295
    mocked_get.return_value = mocked_response
296
    def mocked_requests_http_forbidden_error(*args, **kwargs):
297
        raise requests.HTTPError(response=mocked_response)
298
    mocked_get.side_effect = mocked_requests_http_forbidden_error
299
    with pytest.raises(CommandError) as e:
300
        call_command('sync_desks_timeperiod_exceptions')
301
    assert str(e.value) == 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (HTTP error 403).'
302

  
303
@mock.patch('chrono.agendas.models.requests.get')
304
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
305
    agenda = Agenda(label=u'Test 11 agenda')
306
    agenda.save()
307
    desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
308
    desk.save()
309
    mocked_response = mock.Mock()
310
    mocked_response.text = ICS_SAMPLE
311
    mocked_get.return_value = mocked_response
312
    call_command('sync_desks_timeperiod_exceptions')
313
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
314
    mocked_response.text = """BEGIN:VCALENDAR
315
VERSION:2.0
316
PRODID:-//foo.bar//EN
317
BEGIN:VEVENT
318
DTSTAMP:20180824T082855Z
319
UID:new-and-unique-uid
320
DTSTART:20180831T170800Z
321
DTEND:20180831T203400Z
322
SUMMARY:Wonderfull event
323
END:VEVENT
324
END:VCALENDAR"""
325
    mocked_get.return_value = mocked_response
326
    call_command('sync_desks_timeperiod_exceptions')
327
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
328
    exception = TimePeriodException.objects.get(desk=desk)
329
    assert exception.external_id == 'desk-%s:new-and-unique-uid' % desk.id
330
    mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
331
    mocked_get.return_value = mocked_response
332
    call_command('sync_desks_timeperiod_exceptions')
333
    assert not TimePeriodException.objects.filter(desk=desk, external_id__startswith='desk-%s:' % desk.id).exists()
tests/test_manager.py
3 3
from django.contrib.auth.models import User, Group
4 4
from django.utils.timezone import make_aware, now, localtime
5 5
import datetime
6
import mock
6 7
import pytest
8
import requests
7 9
from webtest import TestApp, Upload
8 10

  
9 11
from chrono.wsgi import application
......
699 701
    assert 'Desk A' in resp.text
700 702
    assert 'Desk B' in resp.text
701 703

  
702

  
703 704
def test_meetings_agenda_delete_desk(app, admin_user):
704 705
    app = login(app)
705 706
    resp = app.get('/manage/', status=200)
......
840 841
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
841 842
    assert 'Import exceptions from .ics' in resp.content
842 843
    resp = resp.click('upload')
844
    assert "You can upload a file or specify an address to a remote calendar." in resp
845
    resp = resp.form.submit(status=302)
846
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
847
    resp = resp.click('upload')
843 848
    resp.form['ics_file'] = Upload('exceptions.ics', 'invalid content', 'text/calendar')
844 849
    resp = resp.form.submit(status=200)
845 850
    assert 'File format is invalid' in resp.content
......
849 854
BEGIN:VEVENT
850 855
DTSTART:20180101
851 856
DTEND:20180101
852
SUMMARY:New eve
857
SUMMARY:New Year's Eve
853 858
RRULE:FREQ=YEARLY
854 859
END:VEVENT
855 860
END:VCALENDAR"""
......
861 866
PRODID:-//foo.bar//EN
862 867
BEGIN:VEVENT
863 868
DTEND:20180101
864
SUMMARY:New eve
869
SUMMARY:New Year's Eve
865 870
END:VEVENT
866 871
END:VCALENDAR"""
867 872
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_start_date, 'text/calendar')
868 873
    resp = resp.form.submit(status=200)
869
    assert 'Event &quot;New eve&quot; has no start date.' in resp.content
874
    assert 'Event &quot;New Year&#39;s Eve&quot; has no start date.' in resp.content
870 875
    ics_with_no_events = """BEGIN:VCALENDAR
871 876
VERSION:2.0
872 877
PRODID:-//foo.bar//EN
......
881 886
BEGIN:VEVENT
882 887
DTSTART:20180101
883 888
DTEND:20180101
884
SUMMARY:New eve
889
SUMMARY:New Year's Eve
885 890
END:VEVENT
886 891
END:VCALENDAR"""
892
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
893
    resp = resp.click('upload')
887 894
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
888 895
    resp = resp.form.submit(status=302)
889
    assert TimePeriodException.objects.count() == 1
896
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
890 897
    resp = resp.follow()
891 898
    assert 'An exception has been imported.' in resp.content
899

  
900
@mock.patch('chrono.agendas.models.requests.get')
901
def test_agenda_import_time_period_exception_with_remote_ics(mocked_get, app, admin_user):
902
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
903
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
904
    MeetingType(agenda=agenda, label='Bar').save()
905
    login(app)
906
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
907
    assert 'Import exceptions from .ics' not in resp.content
908

  
909
    TimePeriod.objects.create(weekday=1, desk=desk,
910
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
911

  
912
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
913
    resp = resp.click('upload')
914

  
915
    assert 'ics_file' in resp.form.fields
916
    assert 'ics_url' in resp.form.fields
917
    resp.form['ics_url'] = 'http://example.com/foo.ics'
918
    mocked_response = mock.Mock()
919
    mocked_response.text =  """BEGIN:VCALENDAR
920
VERSION:2.0
921
PRODID:-//foo.bar//EN
922
BEGIN:VEVENT
923
UID:random-event-id
924
DTSTART:20180101
925
DTEND:20180101
926
SUMMARY:New Year's Eve
927
END:VEVENT
928
END:VCALENDAR"""
929
    mocked_get.return_value = mocked_response
930
    resp = resp.form.submit(status=302)
931
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
932
    exception = TimePeriodException.objects.get(desk=desk)
933
    assert exception.external_id == 'desk-%s:random-event-id' % desk.id
934
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
935
    resp = resp.click('upload')
936
    resp.form['ics_url'] = ''
937
    resp = resp.form.submit(status=302)
938
    assert not TimePeriodException.objects.filter(desk=desk,
939
                    external_id='desk-%s:random-event-id' % desk.id).exists()
940

  
941
@mock.patch('chrono.agendas.models.requests.get')
942
def test_agenda_import_time_period_exception_with_remote_ics_no_events(mocked_get, app, admin_user):
943
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
944
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
945
    MeetingType(agenda=agenda, label='Bar').save()
946
    login(app)
947
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
948
    assert 'Import exceptions from .ics' not in resp.content
949

  
950
    TimePeriod.objects.create(weekday=1, desk=desk,
951
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
952

  
953
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
954
    resp = resp.click('upload')
955
    resp.form['ics_url'] = 'http://example.com/foo.ics'
956
    mocked_response = mock.Mock()
957
    mocked_response.text =  """BEGIN:VCALENDAR
958
VERSION:2.0
959
PRODID:-//foo.bar//EN
960
BEGIN:VEVENT
961
UID:random-event-id
962
DTSTART:20180101
963
DTEND:20180101
964
SUMMARY:New Year's Eve
965
END:VEVENT
966
END:VCALENDAR"""
967
    mocked_get.return_value = mocked_response
968
    resp = resp.form.submit(status=302)
969
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
970
    exception = TimePeriodException.objects.get(desk=desk)
971
    assert exception.external_id == 'desk-%s:random-event-id' % desk.id
972
    mocked_response.text = """BEGIN:VCALENDAR
973
VERSION:2.0
974
PRODID:-//foo.bar//EN
975
END:VCALENDAR"""
976
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
977
    resp = resp.click('upload')
978
    resp = resp.form.submit(status=302)
979
    assert not TimePeriodException.objects.filter(desk=desk,
980
                    external_id='desk-%s:random-event-id' % desk.id).exists()
981

  
982

  
983
@mock.patch('chrono.agendas.models.requests.get')
984
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user):
985
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
986
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
987
    MeetingType(agenda=agenda, label='Bar').save()
988
    login(app)
989
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
990
    assert 'Import exceptions from .ics' not in resp.content
991

  
992
    TimePeriod.objects.create(weekday=1, desk=desk,
993
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
994

  
995
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
996
    resp = resp.click('upload')
997
    resp.form['ics_url'] = 'http://example.com/foo.ics'
998
    mocked_response = mock.Mock()
999
    mocked_response.text =  """BEGIN:VCALENDAR
1000
VERSION:2.0
1001
PRODID:-//foo.bar//EN
1002
BEGIN:VEVENT
1003
UID:first-eventrandom-event-id
1004
DTSTART:20180101
1005
DTEND:20180101
1006
SUMMARY:First test event
1007
END:VEVENT
1008
BEGIN:VEVENT
1009
UID:second-eventrandom-event-id
1010
DTSTART:20190101
1011
DTEND:20190101
1012
SUMMARY:Second test event
1013
END:VEVENT
1014
END:VCALENDAR"""
1015
    mocked_get.return_value = mocked_response
1016
    resp = resp.form.submit(status=302)
1017
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
1018
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1019
    resp = resp.click('upload')
1020
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1021
    mocked_response.text = """BEGIN:VCALENDAR
1022
VERSION:2.0
1023
PRODID:-//foo.bar//EN
1024
BEGIN:VEVENT
1025
UID:secord-eventrandom-event-id
1026
DTSTART:20190101
1027
DTEND:20190101
1028
SUMMARY:Second test event
1029
END:VEVENT
1030
END:VCALENDAR"""
1031
    mocked_get.return_value = mocked_response
1032
    resp = resp.form.submit(status=302)
1033
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
1034

  
1035
@mock.patch('chrono.agendas.models.requests.get')
1036
def test_agenda_import_time_period_exception_from_remote_ics_with_connection_error(mocked_get, app, admin_user):
1037
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
1038
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
1039
    MeetingType(agenda=agenda, label='Bar').save()
1040
    login(app)
1041
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1042
    assert 'Import exceptions from .ics' not in resp.content
1043

  
1044
    TimePeriod.objects.create(weekday=1, desk=desk,
1045
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
1046

  
1047
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1048
    resp = resp.click('upload')
1049

  
1050
    assert 'ics_file' in resp.form.fields
1051
    assert 'ics_url' in resp.form.fields
1052
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1053
    mocked_response = mock.Mock()
1054
    mocked_get.return_value = mocked_response
1055
    def mocked_requests_connection_error(*args, **kwargs):
1056
        raise requests.exceptions.ConnectionError('unreachable')
1057
    mocked_get.side_effect = mocked_requests_connection_error
1058
    resp = resp.form.submit(status=200)
1059
    assert 'Failed to retrieve remote calendar (unreachable).' in resp.content
1060

  
1061
@mock.patch('chrono.agendas.models.requests.get')
1062
def test_agenda_import_time_period_exception_from_forbidden_remote_ics(mocked_get, app, admin_user):
1063
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
1064
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
1065
    MeetingType(agenda=agenda, label='Bar').save()
1066
    login(app)
1067
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1068
    assert 'Import exceptions from .ics' not in resp.content
1069

  
1070
    TimePeriod.objects.create(weekday=1, desk=desk,
1071
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
1072

  
1073
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1074
    resp = resp.click('upload')
1075
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1076
    mocked_response = mock.Mock()
1077
    mocked_response.status_code = 403
1078
    mocked_get.return_value = mocked_response
1079
    def mocked_requests_http_forbidden_error(*args, **kwargs):
1080
        raise requests.exceptions.HTTPError(response=mocked_response)
1081
    mocked_get.side_effect = mocked_requests_http_forbidden_error
1082
    resp = resp.form.submit(status=200)
1083
    assert 'Failed to retrieve remote calendar (HTTP error 403).' in resp.content
1084

  
1085
@mock.patch('chrono.agendas.models.requests.get')
1086
def test_agenda_import_time_period_exception_from_remote_ics_with_ssl_error(mocked_get, app, admin_user):
1087
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
1088
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
1089
    MeetingType(agenda=agenda, label='Bar').save()
1090
    login(app)
1091
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1092
    assert 'Import exceptions from .ics' not in resp.content
1093
    TimePeriod.objects.create(weekday=1, desk=desk,
1094
                start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
1095

  
1096
    resp = app.get('/manage/agendas/%d/' % agenda.pk)
1097
    resp = resp.click('upload')
1098
    resp.form['ics_url'] = 'https://example.com/foo.ics'
1099
    mocked_response = mock.Mock()
1100
    mocked_get.return_value = mocked_response
1101
    def mocked_requests_http_ssl_error(*args, **kwargs):
1102
        raise requests.exceptions.SSLError('SSL error')
1103
    mocked_get.side_effect = mocked_requests_http_ssl_error
1104
    resp = resp.form.submit(status=200)
1105
    assert 'Failed to retrieve remote calendar (SSL error).' in resp.content
892
-