0001-agenda-add-support-for-remote-calendar-file-with-exc.patch
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py | ||
---|---|---|
1 |
# chrono - agendas system |
|
2 |
# Copyright (C) 2016-2017 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import logging |
|
18 | ||
19 |
from chrono.agendas.models import Desk, ICSError |
|
20 |
from django.core.management.base import BaseCommand, CommandError |
|
21 | ||
22 | ||
23 |
class Command(BaseCommand): |
|
24 |
help = 'Synchronize time period exceptions from desks remote ics' |
|
25 | ||
26 |
def handle(self, **options): |
|
27 |
logger = logging.getLogger(__name__) |
|
28 |
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''): |
|
29 |
try: |
|
30 |
desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url) |
|
31 |
except ICSError as e: |
|
32 |
raise CommandError(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e)) |
chrono/agendas/migrations/0020_auto_20171019_1729.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
from __future__ import unicode_literals |
|
3 | ||
4 |
from django.db import migrations, models |
|
5 |
import datetime |
|
6 |
from django.utils.timezone import utc |
|
7 | ||
8 | ||
9 |
class Migration(migrations.Migration): |
|
10 | ||
11 |
dependencies = [ |
|
12 |
('agendas', '0019_timeperiodexception'), |
|
13 |
] |
|
14 | ||
15 |
operations = [ |
|
16 |
migrations.AddField( |
|
17 |
model_name='desk', |
|
18 |
name='timeperiod_exceptions_remote_url', |
|
19 |
field=models.URLField(verbose_name='URL to fetch time period exceptions from', blank=True), |
|
20 |
), |
|
21 |
migrations.AddField( |
|
22 |
model_name='timeperiodexception', |
|
23 |
name='creation_date', |
|
24 |
field=models.DateTimeField(default=datetime.datetime(2017, 10, 19, 17, 29, 53, 280020, tzinfo=utc), auto_now=True), |
|
25 |
preserve_default=False, |
|
26 |
), |
|
27 |
migrations.AddField( |
|
28 |
model_name='timeperiodexception', |
|
29 |
name='external_id', |
|
30 |
field=models.CharField(max_length=256, null=True, verbose_name='External ID'), |
|
31 |
), |
|
32 |
] |
chrono/agendas/models.py | ||
---|---|---|
16 | 16 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | 17 | |
18 | 18 |
import datetime |
19 |
import requests |
|
19 | 20 |
import vobject |
20 | 21 | |
21 | 22 |
from django.contrib.auth.models import Group |
... | ... | |
28 | 29 |
from django.utils.encoding import force_text |
29 | 30 |
from django.utils.formats import date_format, get_format |
30 | 31 |
from django.utils.text import slugify |
31 |
from django.utils.timezone import localtime, now, make_aware, make_naive |
|
32 |
from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware
|
|
32 | 33 |
from django.utils.translation import ugettext_lazy as _ |
33 | 34 | |
34 | 35 |
from jsonfield import JSONField |
... | ... | |
358 | 359 |
agenda = models.ForeignKey(Agenda) |
359 | 360 |
label = models.CharField(_('Label'), max_length=150) |
360 | 361 |
slug = models.SlugField(_('Identifier'), max_length=150) |
362 |
timeperiod_exceptions_remote_url = models.URLField(_('URL to fetch time period exceptions from'), |
|
363 |
blank=True) |
|
361 | 364 | |
362 | 365 |
def __unicode__(self): |
363 | 366 |
return self.label |
... | ... | |
417 | 420 |
in_two_weeks = self.get_exceptions_within_two_weeks() |
418 | 421 |
return self.timeperiodexception_set.count() == len(in_two_weeks) |
419 | 422 | |
420 |
def create_timeperiod_exceptions_from_ics(self, data): |
|
423 |
def create_timeperiod_exceptions_from_remote_ics(self, url): |
|
424 |
try: |
|
425 |
response = requests.get(url) |
|
426 |
response.raise_for_status() |
|
427 |
except requests.HTTPError as e: |
|
428 |
raise ICSError(_('Failed to retrieve remote calendar (HTTP error %s).') % e.response.status_code) |
|
429 |
except requests.RequestException as e: |
|
430 |
raise ICSError(_('Failed to retrieve remote calendar (%s).') % e) |
|
431 | ||
432 |
return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True) |
|
433 | ||
434 |
def remove_timeperiod_exceptions_from_remote_ics(self): |
|
435 |
TimePeriodException.objects.filter(external_id__startswith='desk-%s:' % self.id).delete() |
|
436 | ||
437 |
def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False): |
|
421 | 438 |
try: |
422 | 439 |
parsed = vobject.readOne(data) |
423 | 440 |
except vobject.base.ParseError: |
... | ... | |
425 | 442 | |
426 | 443 |
total_created = 0 |
427 | 444 | |
428 |
if not parsed.contents.get('vevent'): |
|
445 |
if not parsed.contents.get('vevent') and not keep_synced_by_uid:
|
|
429 | 446 |
raise ICSError(_('The file doesn\'t contain any events.')) |
430 | 447 | |
431 | 448 |
with transaction.atomic(): |
432 |
for vevent in parsed.contents['vevent']: |
|
449 |
update_time = now() |
|
450 |
for vevent in parsed.contents.get('vevent', []): |
|
433 | 451 |
event = {} |
452 | ||
434 | 453 |
summary = vevent.contents['summary'][0].value |
435 | 454 |
if not isinstance(summary, unicode): |
436 | 455 |
summary = unicode(summary, 'utf-8') |
... | ... | |
441 | 460 |
if not isinstance(start_dt, datetime.datetime): |
442 | 461 |
start_dt = datetime.datetime.combine(start_dt, |
443 | 462 |
datetime.datetime.min.time()) |
444 |
event['start_datetime'] = start_dt |
|
463 |
if not is_aware(start_dt): |
|
464 |
event['start_datetime'] = make_aware(start_dt) |
|
465 |
else: |
|
466 |
event['start_datetime'] = start_dt |
|
445 | 467 |
except AttributeError: |
446 | 468 |
raise ICSError(_('Event "%s" has no start date.') % summary) |
447 | 469 |
try: |
... | ... | |
452 | 474 |
except AttributeError: |
453 | 475 |
# events without end date are considered as ending the same day |
454 | 476 |
end_dt = datetime.datetime.combine(start_dt, datetime.datetime.max.time()) |
455 |
event['end_datetime'] = end_dt |
|
456 | ||
457 |
obj, created = TimePeriodException.objects.get_or_create(desk=self, label=summary, |
|
458 |
**event) |
|
477 |
if not is_aware(end_dt): |
|
478 |
event['end_datetime'] = make_aware(end_dt) |
|
479 |
else: |
|
480 |
event['end_datetime'] = end_dt |
|
481 |
if keep_synced_by_uid: |
|
482 |
external_id = 'desk-%s:%s' % (self.id, vevent.contents['uid'][0].value) |
|
483 |
event['label'] = summary |
|
484 |
obj, created = TimePeriodException.objects.update_or_create(desk=self, external_id=external_id, |
|
485 |
defaults=event) |
|
486 |
else: |
|
487 |
obj, created = TimePeriodException.objects.update_or_create(desk=self, label=summary, defaults=event) |
|
488 |
# return total_created |
|
459 | 489 |
if created: |
460 | 490 |
total_created += 1 |
461 | 491 | |
492 |
# delete all outdated exceptions from remote calendar |
|
493 |
TimePeriodException.objects.filter(creation_date__lt=update_time, |
|
494 |
external_id__startswith='desk-%s:' % self.id).delete() |
|
495 | ||
462 | 496 |
return total_created |
463 | 497 | |
464 | 498 | |
465 | 499 |
class TimePeriodException(models.Model): |
466 | 500 |
desk = models.ForeignKey(Desk) |
501 |
external_id = models.CharField(_('External ID'), max_length=256, null=True) |
|
467 | 502 |
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True) |
468 | 503 |
start_datetime = models.DateTimeField(_('Exception start time')) |
469 | 504 |
end_datetime = models.DateTimeField(_('Exception end time')) |
505 |
creation_date = models.DateTimeField(auto_now=True) |
|
470 | 506 | |
471 | 507 |
class Meta: |
472 | 508 |
ordering = ['start_datetime'] |
chrono/manager/forms.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import csv |
18 | 18 |
import datetime |
19 |
import requests |
|
19 | 20 | |
20 | 21 |
from django import forms |
21 | 22 |
from django.forms import ValidationError |
22 | 23 |
from django.utils.translation import ugettext_lazy as _ |
23 | 24 | |
24 | 25 |
from chrono.agendas.models import (Event, MeetingType, TimePeriod, Desk, |
25 |
TimePeriodException) |
|
26 |
TimePeriodException, ICSError)
|
|
26 | 27 | |
27 | 28 |
from . import widgets |
28 | 29 | |
... | ... | |
83 | 84 |
widgets = { |
84 | 85 |
'agenda': forms.HiddenInput(), |
85 | 86 |
} |
86 |
exclude = ['slug'] |
|
87 |
exclude = ['slug', 'timeperiod_exceptions_remote_url']
|
|
87 | 88 | |
88 | 89 | |
89 | 90 |
class DeskForm(forms.ModelForm): |
... | ... | |
92 | 93 |
widgets = { |
93 | 94 |
'agenda': forms.HiddenInput(), |
94 | 95 |
} |
95 |
exclude = [] |
|
96 |
exclude = ['timeperiod_exceptions_remote_url']
|
|
96 | 97 | |
97 | 98 | |
98 | 99 |
class TimePeriodExceptionForm(forms.ModelForm): |
... | ... | |
170 | 171 |
model = Desk |
171 | 172 |
fields = [] |
172 | 173 | |
173 |
ics_file = forms.FileField(label=_('ICS File'), |
|
174 |
ics_file = forms.FileField(label=_('ICS File'), required=False,
|
|
174 | 175 |
help_text=_('ICS file containing events which will be considered as exceptions')) |
176 |
ics_url = forms.URLField(label=_('URL'), required=False, |
|
177 |
help_text=_('URL to remote calendar which will be synchronised hourly')) |
chrono/manager/templates/chrono/manager_import_exceptions.html | ||
---|---|---|
13 | 13 |
{% block content %} |
14 | 14 | |
15 | 15 |
<form method="post" enctype="multipart/form-data"> |
16 |
<p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p> |
|
16 | 17 |
{% csrf_token %} |
17 | 18 |
{{ form.as_p }} |
18 | 19 |
<p> |
chrono/manager/views.py | ||
---|---|---|
394 | 394 |
form_class = ExceptionsImportForm |
395 | 395 |
template_name = 'chrono/manager_import_exceptions.html' |
396 | 396 | |
397 |
def get_initial(self): |
|
398 |
return {'ics_url': self.get_object().timeperiod_exceptions_remote_url} |
|
399 | ||
397 | 400 |
def form_valid(self, form): |
401 |
exceptions = None |
|
398 | 402 |
try: |
399 |
exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file']) |
|
403 |
if form.cleaned_data['ics_file']: |
|
404 |
exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file']) |
|
405 |
elif form.cleaned_data['ics_url']: |
|
406 |
exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url']) |
|
407 |
else: |
|
408 |
form.instance.remove_timeperiod_exceptions_from_remote_ics() |
|
400 | 409 |
except ICSError as e: |
401 | 410 |
form.add_error(None, unicode(e)) |
402 | 411 |
return self.form_invalid(form) |
403 |
message = ungettext('An exception has been imported.', |
|
404 |
'%(count)d exceptions have been imported.', exceptions) |
|
405 |
message = message % {'count': exceptions} |
|
406 |
messages.info(self.request, message) |
|
412 |
form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url'] |
|
413 |
form.instance.save() |
|
414 |
if exceptions is not None: |
|
415 |
message = ungettext('An exception has been imported.', |
|
416 |
'%(count)d exceptions have been imported.', exceptions) |
|
417 |
message = message % {'count': exceptions} |
|
418 |
messages.info(self.request, message) |
|
407 | 419 |
return super(DeskImportTimePeriodExceptionsView, self).form_valid(form) |
408 | 420 | |
409 | 421 |
desk_import_time_period_exceptions = DeskImportTimePeriodExceptionsView.as_view() |
debian/chrono.cron.d | ||
---|---|---|
1 |
MAILTO=root |
|
2 | ||
3 |
0 * * * * /sbin/runuser -u chrono /usr/bin/chrono-manage -- tenant_command sync_desks_timeperiod_exceptions --all-tenants |
debian/control | ||
---|---|---|
11 | 11 |
Depends: ${misc:Depends}, ${python:Depends}, |
12 | 12 |
python-django (>= 1.8), |
13 | 13 |
python-gadjo, |
14 |
python-intervaltree |
|
14 |
python-intervaltree, |
|
15 |
python-requests |
|
15 | 16 |
Recommends: python-django-mellon |
16 | 17 |
Description: Agendas System (Python module) |
17 | 18 |
requirements.txt | ||
---|---|---|
3 | 3 |
djangorestframework>=3.1, <3.7 |
4 | 4 |
django-jsonfield >= 0.9.3 |
5 | 5 |
intervaltree |
6 |
requests |
|
7 |
vobject |
setup.py | ||
---|---|---|
107 | 107 |
'djangorestframework>=3.1, <3.7', |
108 | 108 |
'django-jsonfield >= 0.9.3', |
109 | 109 |
'intervaltree', |
110 |
'vobject' |
|
110 |
'vobject', |
|
111 |
'requests' |
|
111 | 112 |
], |
112 | 113 |
zip_safe=False, |
113 | 114 |
cmdclass={ |
tests/test_agendas.py | ||
---|---|---|
1 | 1 |
import pytest |
2 | 2 |
import datetime |
3 |
import logging |
|
4 |
import mock |
|
5 |
import re |
|
6 |
import requests |
|
7 | ||
3 | 8 | |
4 | 9 |
from django.utils.timezone import now, make_aware, localtime |
10 |
from django.core.management import call_command |
|
11 |
from django.core.management.base import CommandError |
|
5 | 12 | |
6 | 13 |
from chrono.agendas.models import (Agenda, Event, Booking, MeetingType, |
7 | 14 |
Desk, TimePeriodException, ICSError) |
... | ... | |
22 | 29 |
BEGIN:VEVENT |
23 | 30 |
DTSTAMP:20170824T092855Z |
24 | 31 |
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4 |
25 |
DTSTART:20170831T180800Z
|
|
26 |
DTEND:20170831T213400Z
|
|
32 |
DTSTART:20170830T180800Z
|
|
33 |
DTEND:20170831T223400Z
|
|
27 | 34 |
SEQUENCE:2 |
28 | 35 |
SUMMARY:Event 2 |
29 | 36 |
END:VEVENT |
... | ... | |
43 | 50 |
DTSTAMP:20170824T082855Z |
44 | 51 |
DTSTART:20180101 |
45 | 52 |
DTEND:20180101 |
46 |
SUMMARY:New eve
|
|
53 |
SUMMARY:New Year's Eve
|
|
47 | 54 |
RRULE:FREQ=YEARLY |
48 | 55 |
END:VEVENT |
49 | 56 |
END:VCALENDAR""" |
... | ... | |
220 | 227 |
with pytest.raises(ICSError) as e: |
221 | 228 |
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS) |
222 | 229 |
assert str(e.value) == "The file doesn't contain any events." |
230 | ||
231 |
@mock.patch('chrono.agendas.models.requests.get') |
|
232 |
def test_timeperiodexception_creation_from_remote_ics(mocked_get): |
|
233 |
agenda = Agenda(label=u'Test 8 agenda') |
|
234 |
agenda.save() |
|
235 |
desk = Desk(label='Test 8 desk', agenda=agenda) |
|
236 |
desk.save() |
|
237 |
mocked_response = mock.Mock() |
|
238 |
mocked_response.text = ICS_SAMPLE |
|
239 |
mocked_get.return_value = mocked_response |
|
240 |
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
241 |
assert exceptions_count == 2 |
|
242 |
mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE) |
|
243 |
mocked_get.return_value = mocked_response |
|
244 |
desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
245 |
for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk): |
|
246 |
assert 'New summary ' in timeperiod.label |
|
247 | ||
248 |
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS |
|
249 |
mocked_get.return_value = mocked_response |
|
250 |
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
251 |
assert exceptions_count == 0 |
|
252 |
TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0 |
|
253 | ||
254 |
@mock.patch('chrono.agendas.models.requests.get') |
|
255 |
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get): |
|
256 |
agenda = Agenda(label=u'Test 9 agenda') |
|
257 |
agenda.save() |
|
258 |
desk = Desk(label='Test 9 desk', agenda=agenda) |
|
259 |
desk.save() |
|
260 |
mocked_response = mock.Mock() |
|
261 |
mocked_response.text = ICS_SAMPLE |
|
262 |
mocked_get.return_value = mocked_response |
|
263 |
def mocked_requests_connection_error(*args, **kwargs): |
|
264 |
raise requests.ConnectionError('unreachable') |
|
265 |
mocked_get.side_effect = mocked_requests_connection_error |
|
266 |
with pytest.raises(ICSError) as e: |
|
267 |
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
268 |
assert str(e.value) == "Failed to retrieve remote calendar (unreachable)." |
|
269 | ||
270 |
@mock.patch('chrono.agendas.models.requests.get') |
|
271 |
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get): |
|
272 |
agenda = Agenda(label=u'Test 10 agenda') |
|
273 |
agenda.save() |
|
274 |
desk = Desk(label='Test 10 desk', agenda=agenda) |
|
275 |
desk.save() |
|
276 |
mocked_response = mock.Mock() |
|
277 |
mocked_response.status_code = 403 |
|
278 |
mocked_get.return_value = mocked_response |
|
279 |
def mocked_requests_http_forbidden_error(*args, **kwargs): |
|
280 |
raise requests.HTTPError(response=mocked_response) |
|
281 |
mocked_get.side_effect = mocked_requests_http_forbidden_error |
|
282 | ||
283 |
with pytest.raises(ICSError) as e: |
|
284 |
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
285 |
assert str(e.value) == "Failed to retrieve remote calendar (HTTP error 403)." |
|
286 | ||
287 |
@mock.patch('chrono.agendas.models.requests.get') |
|
288 |
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, caplog): |
|
289 |
agenda = Agenda(label=u'Test 11 agenda') |
|
290 |
agenda.save() |
|
291 |
desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics') |
|
292 |
desk.save() |
|
293 |
mocked_response = mock.Mock() |
|
294 |
mocked_response.status_code = 403 |
|
295 |
mocked_get.return_value = mocked_response |
|
296 |
def mocked_requests_http_forbidden_error(*args, **kwargs): |
|
297 |
raise requests.HTTPError(response=mocked_response) |
|
298 |
mocked_get.side_effect = mocked_requests_http_forbidden_error |
|
299 |
with pytest.raises(CommandError) as e: |
|
300 |
call_command('sync_desks_timeperiod_exceptions') |
|
301 |
assert str(e.value) == 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (HTTP error 403).' |
|
302 | ||
303 |
@mock.patch('chrono.agendas.models.requests.get') |
|
304 |
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog): |
|
305 |
agenda = Agenda(label=u'Test 11 agenda') |
|
306 |
agenda.save() |
|
307 |
desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics') |
|
308 |
desk.save() |
|
309 |
mocked_response = mock.Mock() |
|
310 |
mocked_response.text = ICS_SAMPLE |
|
311 |
mocked_get.return_value = mocked_response |
|
312 |
call_command('sync_desks_timeperiod_exceptions') |
|
313 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
|
314 |
mocked_response.text = """BEGIN:VCALENDAR |
|
315 |
VERSION:2.0 |
|
316 |
PRODID:-//foo.bar//EN |
|
317 |
BEGIN:VEVENT |
|
318 |
DTSTAMP:20180824T082855Z |
|
319 |
UID:new-and-unique-uid |
|
320 |
DTSTART:20180831T170800Z |
|
321 |
DTEND:20180831T203400Z |
|
322 |
SUMMARY:Wonderfull event |
|
323 |
END:VEVENT |
|
324 |
END:VCALENDAR""" |
|
325 |
mocked_get.return_value = mocked_response |
|
326 |
call_command('sync_desks_timeperiod_exceptions') |
|
327 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
|
328 |
exception = TimePeriodException.objects.get(desk=desk) |
|
329 |
assert exception.external_id == 'desk-%s:new-and-unique-uid' % desk.id |
|
330 |
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS |
|
331 |
mocked_get.return_value = mocked_response |
|
332 |
call_command('sync_desks_timeperiod_exceptions') |
|
333 |
assert not TimePeriodException.objects.filter(desk=desk, external_id__startswith='desk-%s:' % desk.id).exists() |
tests/test_manager.py | ||
---|---|---|
3 | 3 |
from django.contrib.auth.models import User, Group |
4 | 4 |
from django.utils.timezone import make_aware, now, localtime |
5 | 5 |
import datetime |
6 |
import mock |
|
6 | 7 |
import pytest |
8 |
import requests |
|
7 | 9 |
from webtest import TestApp, Upload |
8 | 10 | |
9 | 11 |
from chrono.wsgi import application |
... | ... | |
699 | 701 |
assert 'Desk A' in resp.text |
700 | 702 |
assert 'Desk B' in resp.text |
701 | 703 | |
702 | ||
703 | 704 |
def test_meetings_agenda_delete_desk(app, admin_user): |
704 | 705 |
app = login(app) |
705 | 706 |
resp = app.get('/manage/', status=200) |
... | ... | |
840 | 841 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
841 | 842 |
assert 'Import exceptions from .ics' in resp.content |
842 | 843 |
resp = resp.click('upload') |
844 |
assert "You can upload a file or specify an address to a remote calendar." in resp |
|
845 |
resp = resp.form.submit(status=302) |
|
846 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
847 |
resp = resp.click('upload') |
|
843 | 848 |
resp.form['ics_file'] = Upload('exceptions.ics', 'invalid content', 'text/calendar') |
844 | 849 |
resp = resp.form.submit(status=200) |
845 | 850 |
assert 'File format is invalid' in resp.content |
... | ... | |
849 | 854 |
BEGIN:VEVENT |
850 | 855 |
DTSTART:20180101 |
851 | 856 |
DTEND:20180101 |
852 |
SUMMARY:New eve
|
|
857 |
SUMMARY:New Year's Eve
|
|
853 | 858 |
RRULE:FREQ=YEARLY |
854 | 859 |
END:VEVENT |
855 | 860 |
END:VCALENDAR""" |
... | ... | |
861 | 866 |
PRODID:-//foo.bar//EN |
862 | 867 |
BEGIN:VEVENT |
863 | 868 |
DTEND:20180101 |
864 |
SUMMARY:New eve
|
|
869 |
SUMMARY:New Year's Eve
|
|
865 | 870 |
END:VEVENT |
866 | 871 |
END:VCALENDAR""" |
867 | 872 |
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_start_date, 'text/calendar') |
868 | 873 |
resp = resp.form.submit(status=200) |
869 |
assert 'Event "New eve" has no start date.' in resp.content
|
|
874 |
assert 'Event "New Year's Eve" has no start date.' in resp.content
|
|
870 | 875 |
ics_with_no_events = """BEGIN:VCALENDAR |
871 | 876 |
VERSION:2.0 |
872 | 877 |
PRODID:-//foo.bar//EN |
... | ... | |
881 | 886 |
BEGIN:VEVENT |
882 | 887 |
DTSTART:20180101 |
883 | 888 |
DTEND:20180101 |
884 |
SUMMARY:New eve
|
|
889 |
SUMMARY:New Year's Eve
|
|
885 | 890 |
END:VEVENT |
886 | 891 |
END:VCALENDAR""" |
892 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
893 |
resp = resp.click('upload') |
|
887 | 894 |
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar') |
888 | 895 |
resp = resp.form.submit(status=302) |
889 |
assert TimePeriodException.objects.count() == 1 |
|
896 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
|
890 | 897 |
resp = resp.follow() |
891 | 898 |
assert 'An exception has been imported.' in resp.content |
899 | ||
900 |
@mock.patch('chrono.agendas.models.requests.get') |
|
901 |
def test_agenda_import_time_period_exception_with_remote_ics(mocked_get, app, admin_user): |
|
902 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
903 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
904 |
MeetingType(agenda=agenda, label='Bar').save() |
|
905 |
login(app) |
|
906 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
907 |
assert 'Import exceptions from .ics' not in resp.content |
|
908 | ||
909 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
910 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
911 | ||
912 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
913 |
resp = resp.click('upload') |
|
914 | ||
915 |
assert 'ics_file' in resp.form.fields |
|
916 |
assert 'ics_url' in resp.form.fields |
|
917 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
918 |
mocked_response = mock.Mock() |
|
919 |
mocked_response.text = """BEGIN:VCALENDAR |
|
920 |
VERSION:2.0 |
|
921 |
PRODID:-//foo.bar//EN |
|
922 |
BEGIN:VEVENT |
|
923 |
UID:random-event-id |
|
924 |
DTSTART:20180101 |
|
925 |
DTEND:20180101 |
|
926 |
SUMMARY:New Year's Eve |
|
927 |
END:VEVENT |
|
928 |
END:VCALENDAR""" |
|
929 |
mocked_get.return_value = mocked_response |
|
930 |
resp = resp.form.submit(status=302) |
|
931 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
|
932 |
exception = TimePeriodException.objects.get(desk=desk) |
|
933 |
assert exception.external_id == 'desk-%s:random-event-id' % desk.id |
|
934 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
935 |
resp = resp.click('upload') |
|
936 |
resp.form['ics_url'] = '' |
|
937 |
resp = resp.form.submit(status=302) |
|
938 |
assert not TimePeriodException.objects.filter(desk=desk, |
|
939 |
external_id='desk-%s:random-event-id' % desk.id).exists() |
|
940 | ||
941 |
@mock.patch('chrono.agendas.models.requests.get') |
|
942 |
def test_agenda_import_time_period_exception_with_remote_ics_no_events(mocked_get, app, admin_user): |
|
943 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
944 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
945 |
MeetingType(agenda=agenda, label='Bar').save() |
|
946 |
login(app) |
|
947 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
948 |
assert 'Import exceptions from .ics' not in resp.content |
|
949 | ||
950 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
951 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
952 | ||
953 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
954 |
resp = resp.click('upload') |
|
955 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
956 |
mocked_response = mock.Mock() |
|
957 |
mocked_response.text = """BEGIN:VCALENDAR |
|
958 |
VERSION:2.0 |
|
959 |
PRODID:-//foo.bar//EN |
|
960 |
BEGIN:VEVENT |
|
961 |
UID:random-event-id |
|
962 |
DTSTART:20180101 |
|
963 |
DTEND:20180101 |
|
964 |
SUMMARY:New Year's Eve |
|
965 |
END:VEVENT |
|
966 |
END:VCALENDAR""" |
|
967 |
mocked_get.return_value = mocked_response |
|
968 |
resp = resp.form.submit(status=302) |
|
969 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
|
970 |
exception = TimePeriodException.objects.get(desk=desk) |
|
971 |
assert exception.external_id == 'desk-%s:random-event-id' % desk.id |
|
972 |
mocked_response.text = """BEGIN:VCALENDAR |
|
973 |
VERSION:2.0 |
|
974 |
PRODID:-//foo.bar//EN |
|
975 |
END:VCALENDAR""" |
|
976 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
977 |
resp = resp.click('upload') |
|
978 |
resp = resp.form.submit(status=302) |
|
979 |
assert not TimePeriodException.objects.filter(desk=desk, |
|
980 |
external_id='desk-%s:random-event-id' % desk.id).exists() |
|
981 | ||
982 | ||
983 |
@mock.patch('chrono.agendas.models.requests.get') |
|
984 |
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user): |
|
985 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
986 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
987 |
MeetingType(agenda=agenda, label='Bar').save() |
|
988 |
login(app) |
|
989 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
990 |
assert 'Import exceptions from .ics' not in resp.content |
|
991 | ||
992 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
993 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
994 | ||
995 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
996 |
resp = resp.click('upload') |
|
997 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
998 |
mocked_response = mock.Mock() |
|
999 |
mocked_response.text = """BEGIN:VCALENDAR |
|
1000 |
VERSION:2.0 |
|
1001 |
PRODID:-//foo.bar//EN |
|
1002 |
BEGIN:VEVENT |
|
1003 |
UID:first-eventrandom-event-id |
|
1004 |
DTSTART:20180101 |
|
1005 |
DTEND:20180101 |
|
1006 |
SUMMARY:First test event |
|
1007 |
END:VEVENT |
|
1008 |
BEGIN:VEVENT |
|
1009 |
UID:second-eventrandom-event-id |
|
1010 |
DTSTART:20190101 |
|
1011 |
DTEND:20190101 |
|
1012 |
SUMMARY:Second test event |
|
1013 |
END:VEVENT |
|
1014 |
END:VCALENDAR""" |
|
1015 |
mocked_get.return_value = mocked_response |
|
1016 |
resp = resp.form.submit(status=302) |
|
1017 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
|
1018 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1019 |
resp = resp.click('upload') |
|
1020 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
1021 |
mocked_response.text = """BEGIN:VCALENDAR |
|
1022 |
VERSION:2.0 |
|
1023 |
PRODID:-//foo.bar//EN |
|
1024 |
BEGIN:VEVENT |
|
1025 |
UID:secord-eventrandom-event-id |
|
1026 |
DTSTART:20190101 |
|
1027 |
DTEND:20190101 |
|
1028 |
SUMMARY:Second test event |
|
1029 |
END:VEVENT |
|
1030 |
END:VCALENDAR""" |
|
1031 |
mocked_get.return_value = mocked_response |
|
1032 |
resp = resp.form.submit(status=302) |
|
1033 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
|
1034 | ||
1035 |
@mock.patch('chrono.agendas.models.requests.get') |
|
1036 |
def test_agenda_import_time_period_exception_from_remote_ics_with_connection_error(mocked_get, app, admin_user): |
|
1037 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
1038 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
1039 |
MeetingType(agenda=agenda, label='Bar').save() |
|
1040 |
login(app) |
|
1041 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1042 |
assert 'Import exceptions from .ics' not in resp.content |
|
1043 | ||
1044 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
1045 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
1046 | ||
1047 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1048 |
resp = resp.click('upload') |
|
1049 | ||
1050 |
assert 'ics_file' in resp.form.fields |
|
1051 |
assert 'ics_url' in resp.form.fields |
|
1052 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
1053 |
mocked_response = mock.Mock() |
|
1054 |
mocked_get.return_value = mocked_response |
|
1055 |
def mocked_requests_connection_error(*args, **kwargs): |
|
1056 |
raise requests.exceptions.ConnectionError('unreachable') |
|
1057 |
mocked_get.side_effect = mocked_requests_connection_error |
|
1058 |
resp = resp.form.submit(status=200) |
|
1059 |
assert 'Failed to retrieve remote calendar (unreachable).' in resp.content |
|
1060 | ||
1061 |
@mock.patch('chrono.agendas.models.requests.get') |
|
1062 |
def test_agenda_import_time_period_exception_from_forbidden_remote_ics(mocked_get, app, admin_user): |
|
1063 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
1064 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
1065 |
MeetingType(agenda=agenda, label='Bar').save() |
|
1066 |
login(app) |
|
1067 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1068 |
assert 'Import exceptions from .ics' not in resp.content |
|
1069 | ||
1070 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
1071 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
1072 | ||
1073 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1074 |
resp = resp.click('upload') |
|
1075 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
1076 |
mocked_response = mock.Mock() |
|
1077 |
mocked_response.status_code = 403 |
|
1078 |
mocked_get.return_value = mocked_response |
|
1079 |
def mocked_requests_http_forbidden_error(*args, **kwargs): |
|
1080 |
raise requests.exceptions.HTTPError(response=mocked_response) |
|
1081 |
mocked_get.side_effect = mocked_requests_http_forbidden_error |
|
1082 |
resp = resp.form.submit(status=200) |
|
1083 |
assert 'Failed to retrieve remote calendar (HTTP error 403).' in resp.content |
|
1084 | ||
1085 |
@mock.patch('chrono.agendas.models.requests.get') |
|
1086 |
def test_agenda_import_time_period_exception_from_remote_ics_with_ssl_error(mocked_get, app, admin_user): |
|
1087 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
1088 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
1089 |
MeetingType(agenda=agenda, label='Bar').save() |
|
1090 |
login(app) |
|
1091 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1092 |
assert 'Import exceptions from .ics' not in resp.content |
|
1093 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
1094 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
1095 | ||
1096 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1097 |
resp = resp.click('upload') |
|
1098 |
resp.form['ics_url'] = 'https://example.com/foo.ics' |
|
1099 |
mocked_response = mock.Mock() |
|
1100 |
mocked_get.return_value = mocked_response |
|
1101 |
def mocked_requests_http_ssl_error(*args, **kwargs): |
|
1102 |
raise requests.exceptions.SSLError('SSL error') |
|
1103 |
mocked_get.side_effect = mocked_requests_http_ssl_error |
|
1104 |
resp = resp.form.submit(status=200) |
|
1105 |
assert 'Failed to retrieve remote calendar (SSL error).' in resp.content |
|
892 |
- |