0001-agenda-add-support-for-remote-calendar-file-with-exc.patch
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py | ||
---|---|---|
1 |
# chrono - agendas system |
|
2 |
# Copyright (C) 2016-2017 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import six |
|
18 |
import sys |
|
19 | ||
20 |
from chrono.agendas.models import Desk, ICSError |
|
21 |
from django.core.management.base import BaseCommand, CommandError |
|
22 | ||
23 | ||
24 |
class Command(BaseCommand): |
|
25 |
help = 'Synchronize time period exceptions from desks remote ics' |
|
26 | ||
27 |
def handle(self, **options): |
|
28 |
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''): |
|
29 |
try: |
|
30 |
desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url) |
|
31 |
except ICSError as e: |
|
32 |
print >> sys.stderr, u'unable to create timeperiod exceptions for "%s": %s' % (desk, e) |
chrono/agendas/migrations/0020_auto_20171102_1021.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
from __future__ import unicode_literals |
|
3 | ||
4 |
from django.db import migrations, models |
|
5 |
import datetime |
|
6 |
from django.utils.timezone import utc |
|
7 | ||
8 | ||
9 |
class Migration(migrations.Migration): |
|
10 | ||
11 |
dependencies = [ |
|
12 |
('agendas', '0019_timeperiodexception'), |
|
13 |
] |
|
14 | ||
15 |
operations = [ |
|
16 |
migrations.AddField( |
|
17 |
model_name='desk', |
|
18 |
name='timeperiod_exceptions_remote_url', |
|
19 |
field=models.URLField(verbose_name='URL to fetch time period exceptions from', blank=True), |
|
20 |
), |
|
21 |
migrations.AddField( |
|
22 |
model_name='timeperiodexception', |
|
23 |
name='external_id', |
|
24 |
field=models.CharField(max_length=256, verbose_name='External ID', blank=True), |
|
25 |
), |
|
26 |
migrations.AddField( |
|
27 |
model_name='timeperiodexception', |
|
28 |
name='update_datetime', |
|
29 |
field=models.DateTimeField(default=datetime.datetime(2017, 11, 2, 10, 21, 1, 826837, tzinfo=utc), auto_now=True), |
|
30 |
preserve_default=False, |
|
31 |
), |
|
32 |
] |
chrono/agendas/models.py | ||
---|---|---|
16 | 16 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | 17 | |
18 | 18 |
import datetime |
19 |
import requests |
|
19 | 20 |
import vobject |
20 | 21 | |
21 | 22 |
from django.contrib.auth.models import Group |
... | ... | |
28 | 29 |
from django.utils.encoding import force_text |
29 | 30 |
from django.utils.formats import date_format, get_format |
30 | 31 |
from django.utils.text import slugify |
31 |
from django.utils.timezone import localtime, now, make_aware, make_naive |
|
32 |
from django.utils.timezone import localtime, now, make_aware, make_naive, is_aware
|
|
32 | 33 |
from django.utils.translation import ugettext_lazy as _ |
33 | 34 | |
34 | 35 |
from jsonfield import JSONField |
... | ... | |
358 | 359 |
agenda = models.ForeignKey(Agenda) |
359 | 360 |
label = models.CharField(_('Label'), max_length=150) |
360 | 361 |
slug = models.SlugField(_('Identifier'), max_length=150) |
362 |
timeperiod_exceptions_remote_url = models.URLField(_('URL to fetch time period exceptions from'), |
|
363 |
blank=True) |
|
361 | 364 | |
362 | 365 |
def __unicode__(self): |
363 | 366 |
return self.label |
... | ... | |
417 | 420 |
in_two_weeks = self.get_exceptions_within_two_weeks() |
418 | 421 |
return self.timeperiodexception_set.count() == len(in_two_weeks) |
419 | 422 | |
420 |
def create_timeperiod_exceptions_from_ics(self, data): |
|
423 |
def create_timeperiod_exceptions_from_remote_ics(self, url): |
|
424 |
try: |
|
425 |
response = requests.get(url) |
|
426 |
response.raise_for_status() |
|
427 |
except requests.HTTPError as e: |
|
428 |
raise ICSError(_('Failed to retrieve remote calendar (HTTP error %s).') % e.response.status_code) |
|
429 |
except requests.RequestException as e: |
|
430 |
raise ICSError(_('Failed to retrieve remote calendar (%s).') % e) |
|
431 | ||
432 |
return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True) |
|
433 | ||
434 |
def remove_timeperiod_exceptions_from_remote_ics(self): |
|
435 |
TimePeriodException.objects.exclude(desk=self, external_id='').delete() |
|
436 | ||
437 |
def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False): |
|
421 | 438 |
try: |
422 | 439 |
parsed = vobject.readOne(data) |
423 | 440 |
except vobject.base.ParseError: |
... | ... | |
425 | 442 | |
426 | 443 |
total_created = 0 |
427 | 444 | |
428 |
if not parsed.contents.get('vevent'): |
|
445 |
if not parsed.contents.get('vevent') and not keep_synced_by_uid:
|
|
429 | 446 |
raise ICSError(_('The file doesn\'t contain any events.')) |
430 | 447 | |
431 | 448 |
with transaction.atomic(): |
432 |
for vevent in parsed.contents['vevent']: |
|
449 |
update_datetime = now() |
|
450 |
for vevent in parsed.contents.get('vevent', []): |
|
433 | 451 |
event = {} |
452 | ||
434 | 453 |
summary = vevent.contents['summary'][0].value |
435 | 454 |
if not isinstance(summary, unicode): |
436 | 455 |
summary = unicode(summary, 'utf-8') |
... | ... | |
441 | 460 |
if not isinstance(start_dt, datetime.datetime): |
442 | 461 |
start_dt = datetime.datetime.combine(start_dt, |
443 | 462 |
datetime.datetime.min.time()) |
444 |
event['start_datetime'] = start_dt |
|
463 |
if not is_aware(start_dt): |
|
464 |
event['start_datetime'] = make_aware(start_dt) |
|
465 |
else: |
|
466 |
event['start_datetime'] = start_dt |
|
445 | 467 |
except AttributeError: |
446 | 468 |
raise ICSError(_('Event "%s" has no start date.') % summary) |
447 | 469 |
try: |
... | ... | |
452 | 474 |
except AttributeError: |
453 | 475 |
# events without end date are considered as ending the same day |
454 | 476 |
end_dt = datetime.datetime.combine(start_dt, datetime.datetime.max.time()) |
455 |
event['end_datetime'] = end_dt |
|
456 | ||
457 |
obj, created = TimePeriodException.objects.get_or_create(desk=self, label=summary, |
|
458 |
**event) |
|
477 |
if not is_aware(end_dt): |
|
478 |
event['end_datetime'] = make_aware(end_dt) |
|
479 |
else: |
|
480 |
event['end_datetime'] = end_dt |
|
481 |
if keep_synced_by_uid: |
|
482 |
external_id = vevent.contents['uid'][0].value |
|
483 |
event['label'] = summary |
|
484 |
obj, created = TimePeriodException.objects.update_or_create(desk=self, external_id=external_id, |
|
485 |
defaults=event) |
|
486 |
else: |
|
487 |
obj, created = TimePeriodException.objects.update_or_create(desk=self, label=summary, defaults=event) |
|
488 |
# return total_created |
|
459 | 489 |
if created: |
460 | 490 |
total_created += 1 |
461 | 491 | |
492 |
if keep_synced_by_uid: |
|
493 |
# delete all outdated exceptions from remote calendar |
|
494 |
TimePeriodException.objects.filter(update_datetime__lt=update_datetime, |
|
495 |
desk=self).exclude(external_id='').delete() |
|
496 | ||
462 | 497 |
return total_created |
463 | 498 | |
464 | 499 | |
465 | 500 |
class TimePeriodException(models.Model): |
466 | 501 |
desk = models.ForeignKey(Desk) |
502 |
external_id = models.CharField(_('External ID'), max_length=256, blank=True) |
|
467 | 503 |
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True) |
468 | 504 |
start_datetime = models.DateTimeField(_('Exception start time')) |
469 | 505 |
end_datetime = models.DateTimeField(_('Exception end time')) |
506 |
update_datetime = models.DateTimeField(auto_now=True) |
|
470 | 507 | |
471 | 508 |
class Meta: |
472 | 509 |
ordering = ['start_datetime'] |
chrono/manager/forms.py | ||
---|---|---|
83 | 83 |
widgets = { |
84 | 84 |
'agenda': forms.HiddenInput(), |
85 | 85 |
} |
86 |
exclude = ['slug'] |
|
86 |
exclude = ['slug', 'timeperiod_exceptions_remote_url']
|
|
87 | 87 | |
88 | 88 | |
89 | 89 |
class DeskForm(forms.ModelForm): |
... | ... | |
92 | 92 |
widgets = { |
93 | 93 |
'agenda': forms.HiddenInput(), |
94 | 94 |
} |
95 |
exclude = [] |
|
95 |
exclude = ['timeperiod_exceptions_remote_url']
|
|
96 | 96 | |
97 | 97 | |
98 | 98 |
class TimePeriodExceptionForm(forms.ModelForm): |
... | ... | |
170 | 170 |
model = Desk |
171 | 171 |
fields = [] |
172 | 172 | |
173 |
ics_file = forms.FileField(label=_('ICS File'), |
|
173 |
ics_file = forms.FileField(label=_('ICS File'), required=False,
|
|
174 | 174 |
help_text=_('ICS file containing events which will be considered as exceptions')) |
175 |
ics_url = forms.URLField(label=_('URL'), required=False, |
|
176 |
help_text=_('URL to remote calendar which will be synchronised hourly')) |
chrono/manager/templates/chrono/manager_import_exceptions.html | ||
---|---|---|
13 | 13 |
{% block content %} |
14 | 14 | |
15 | 15 |
<form method="post" enctype="multipart/form-data"> |
16 |
<p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p> |
|
16 | 17 |
{% csrf_token %} |
17 | 18 |
{{ form.as_p }} |
18 | 19 |
<p> |
chrono/manager/views.py | ||
---|---|---|
394 | 394 |
form_class = ExceptionsImportForm |
395 | 395 |
template_name = 'chrono/manager_import_exceptions.html' |
396 | 396 | |
397 |
def get_initial(self): |
|
398 |
return {'ics_url': self.get_object().timeperiod_exceptions_remote_url} |
|
399 | ||
397 | 400 |
def form_valid(self, form): |
401 |
exceptions = None |
|
398 | 402 |
try: |
399 |
exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file']) |
|
403 |
if form.cleaned_data['ics_file']: |
|
404 |
exceptions = form.instance.create_timeperiod_exceptions_from_ics(form.cleaned_data['ics_file']) |
|
405 |
elif form.cleaned_data['ics_url']: |
|
406 |
exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url']) |
|
407 |
else: |
|
408 |
form.instance.remove_timeperiod_exceptions_from_remote_ics() |
|
400 | 409 |
except ICSError as e: |
401 | 410 |
form.add_error(None, unicode(e)) |
402 | 411 |
return self.form_invalid(form) |
403 |
message = ungettext('An exception has been imported.', |
|
404 |
'%(count)d exceptions have been imported.', exceptions) |
|
405 |
message = message % {'count': exceptions} |
|
406 |
messages.info(self.request, message) |
|
412 |
form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url'] |
|
413 |
form.instance.save() |
|
414 |
if exceptions is not None: |
|
415 |
message = ungettext('An exception has been imported.', |
|
416 |
'%(count)d exceptions have been imported.', exceptions) |
|
417 |
message = message % {'count': exceptions} |
|
418 |
messages.info(self.request, message) |
|
407 | 419 |
return super(DeskImportTimePeriodExceptionsView, self).form_valid(form) |
408 | 420 | |
409 | 421 |
desk_import_time_period_exceptions = DeskImportTimePeriodExceptionsView.as_view() |
debian/chrono.cron.d | ||
---|---|---|
1 |
MAILTO=root |
|
2 | ||
3 |
0 * * * * /sbin/runuser -u chrono /usr/bin/chrono-manage -- tenant_command sync_desks_timeperiod_exceptions --all-tenants |
debian/control | ||
---|---|---|
11 | 11 |
Depends: ${misc:Depends}, ${python:Depends}, |
12 | 12 |
python-django (>= 1.8), |
13 | 13 |
python-gadjo, |
14 |
python-intervaltree |
|
14 |
python-intervaltree, |
|
15 |
python-requests |
|
15 | 16 |
Recommends: python-django-mellon |
16 | 17 |
Description: Agendas System (Python module) |
17 | 18 |
requirements.txt | ||
---|---|---|
3 | 3 |
djangorestframework>=3.1, <3.7 |
4 | 4 |
django-jsonfield >= 0.9.3 |
5 | 5 |
intervaltree |
6 |
requests |
|
7 |
vobject |
setup.py | ||
---|---|---|
107 | 107 |
'djangorestframework>=3.1, <3.7', |
108 | 108 |
'django-jsonfield >= 0.9.3', |
109 | 109 |
'intervaltree', |
110 |
'vobject' |
|
110 |
'vobject', |
|
111 |
'requests' |
|
111 | 112 |
], |
112 | 113 |
zip_safe=False, |
113 | 114 |
cmdclass={ |
tests/test_agendas.py | ||
---|---|---|
1 | 1 |
import pytest |
2 | 2 |
import datetime |
3 |
import mock |
|
4 |
import re |
|
5 |
import requests |
|
6 | ||
3 | 7 | |
4 | 8 |
from django.utils.timezone import now, make_aware, localtime |
9 |
from django.core.management import call_command |
|
10 |
from django.core.management.base import CommandError |
|
5 | 11 | |
6 | 12 |
from chrono.agendas.models import (Agenda, Event, Booking, MeetingType, |
7 | 13 |
Desk, TimePeriodException, ICSError) |
... | ... | |
22 | 28 |
BEGIN:VEVENT |
23 | 29 |
DTSTAMP:20170824T092855Z |
24 | 30 |
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4 |
25 |
DTSTART:20170831T180800Z
|
|
26 |
DTEND:20170831T213400Z
|
|
31 |
DTSTART:20170830T180800Z
|
|
32 |
DTEND:20170831T223400Z
|
|
27 | 33 |
SEQUENCE:2 |
28 | 34 |
SUMMARY:Event 2 |
29 | 35 |
END:VEVENT |
... | ... | |
43 | 49 |
DTSTAMP:20170824T082855Z |
44 | 50 |
DTSTART:20180101 |
45 | 51 |
DTEND:20180101 |
46 |
SUMMARY:New eve
|
|
52 |
SUMMARY:New Year's Eve
|
|
47 | 53 |
RRULE:FREQ=YEARLY |
48 | 54 |
END:VEVENT |
49 | 55 |
END:VCALENDAR""" |
... | ... | |
220 | 226 |
with pytest.raises(ICSError) as e: |
221 | 227 |
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS) |
222 | 228 |
assert str(e.value) == "The file doesn't contain any events." |
229 | ||
230 |
@mock.patch('chrono.agendas.models.requests.get') |
|
231 |
def test_timeperiodexception_creation_from_remote_ics(mocked_get): |
|
232 |
agenda = Agenda(label=u'Test 8 agenda') |
|
233 |
agenda.save() |
|
234 |
desk = Desk(label='Test 8 desk', agenda=agenda) |
|
235 |
desk.save() |
|
236 |
mocked_response = mock.Mock() |
|
237 |
mocked_response.text = ICS_SAMPLE |
|
238 |
mocked_get.return_value = mocked_response |
|
239 |
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
240 |
assert exceptions_count == 2 |
|
241 |
mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE) |
|
242 |
mocked_get.return_value = mocked_response |
|
243 |
desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
244 |
for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk): |
|
245 |
assert 'New summary ' in timeperiod.label |
|
246 | ||
247 |
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS |
|
248 |
mocked_get.return_value = mocked_response |
|
249 |
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
250 |
assert exceptions_count == 0 |
|
251 |
TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0 |
|
252 | ||
253 |
@mock.patch('chrono.agendas.models.requests.get') |
|
254 |
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get): |
|
255 |
agenda = Agenda(label=u'Test 9 agenda') |
|
256 |
agenda.save() |
|
257 |
desk = Desk(label='Test 9 desk', agenda=agenda) |
|
258 |
desk.save() |
|
259 |
mocked_response = mock.Mock() |
|
260 |
mocked_response.text = ICS_SAMPLE |
|
261 |
mocked_get.return_value = mocked_response |
|
262 |
def mocked_requests_connection_error(*args, **kwargs): |
|
263 |
raise requests.ConnectionError('unreachable') |
|
264 |
mocked_get.side_effect = mocked_requests_connection_error |
|
265 |
with pytest.raises(ICSError) as e: |
|
266 |
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
267 |
assert str(e.value) == "Failed to retrieve remote calendar (unreachable)." |
|
268 | ||
269 |
@mock.patch('chrono.agendas.models.requests.get') |
|
270 |
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get): |
|
271 |
agenda = Agenda(label=u'Test 10 agenda') |
|
272 |
agenda.save() |
|
273 |
desk = Desk(label='Test 10 desk', agenda=agenda) |
|
274 |
desk.save() |
|
275 |
mocked_response = mock.Mock() |
|
276 |
mocked_response.status_code = 403 |
|
277 |
mocked_get.return_value = mocked_response |
|
278 |
def mocked_requests_http_forbidden_error(*args, **kwargs): |
|
279 |
raise requests.HTTPError(response=mocked_response) |
|
280 |
mocked_get.side_effect = mocked_requests_http_forbidden_error |
|
281 | ||
282 |
with pytest.raises(ICSError) as e: |
|
283 |
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
284 |
assert str(e.value) == "Failed to retrieve remote calendar (HTTP error 403)." |
|
285 | ||
286 |
@mock.patch('chrono.agendas.models.requests.get') |
|
287 |
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys): |
|
288 |
agenda = Agenda(label=u'Test 11 agenda') |
|
289 |
agenda.save() |
|
290 |
desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics') |
|
291 |
desk.save() |
|
292 |
mocked_response = mock.Mock() |
|
293 |
mocked_response.status_code = 403 |
|
294 |
mocked_get.return_value = mocked_response |
|
295 |
def mocked_requests_http_forbidden_error(*args, **kwargs): |
|
296 |
raise requests.HTTPError(response=mocked_response) |
|
297 |
mocked_get.side_effect = mocked_requests_http_forbidden_error |
|
298 |
call_command('sync_desks_timeperiod_exceptions') |
|
299 |
out, err = capsys.readouterr() |
|
300 |
assert err == 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (HTTP error 403).\n' |
|
301 | ||
302 |
@mock.patch('chrono.agendas.models.requests.get') |
|
303 |
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog): |
|
304 |
agenda = Agenda(label=u'Test 11 agenda') |
|
305 |
agenda.save() |
|
306 |
desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics') |
|
307 |
desk.save() |
|
308 |
mocked_response = mock.Mock() |
|
309 |
mocked_response.text = ICS_SAMPLE |
|
310 |
mocked_get.return_value = mocked_response |
|
311 |
call_command('sync_desks_timeperiod_exceptions') |
|
312 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
|
313 |
mocked_response.text = """BEGIN:VCALENDAR |
|
314 |
VERSION:2.0 |
|
315 |
PRODID:-//foo.bar//EN |
|
316 |
BEGIN:VEVENT |
|
317 |
DTSTAMP:20180824T082855Z |
|
318 |
UID:new-and-unique-uid |
|
319 |
DTSTART:20180831T170800Z |
|
320 |
DTEND:20180831T203400Z |
|
321 |
SUMMARY:Wonderfull event |
|
322 |
END:VEVENT |
|
323 |
END:VCALENDAR""" |
|
324 |
mocked_get.return_value = mocked_response |
|
325 |
call_command('sync_desks_timeperiod_exceptions') |
|
326 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
|
327 |
exception = TimePeriodException.objects.get(desk=desk) |
|
328 |
assert exception.external_id == 'new-and-unique-uid' |
|
329 |
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS |
|
330 |
mocked_get.return_value = mocked_response |
|
331 |
call_command('sync_desks_timeperiod_exceptions') |
|
332 |
assert not TimePeriodException.objects.filter(desk=desk).exists() |
tests/test_manager.py | ||
---|---|---|
3 | 3 |
from django.contrib.auth.models import User, Group |
4 | 4 |
from django.utils.timezone import make_aware, now, localtime |
5 | 5 |
import datetime |
6 |
import mock |
|
6 | 7 |
import pytest |
8 |
import requests |
|
7 | 9 |
from webtest import TestApp, Upload |
8 | 10 | |
9 | 11 |
from chrono.wsgi import application |
... | ... | |
840 | 842 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
841 | 843 |
assert 'Import exceptions from .ics' in resp.content |
842 | 844 |
resp = resp.click('upload') |
845 |
assert "You can upload a file or specify an address to a remote calendar." in resp |
|
846 |
resp = resp.form.submit(status=302) |
|
847 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
848 |
resp = resp.click('upload') |
|
843 | 849 |
resp.form['ics_file'] = Upload('exceptions.ics', 'invalid content', 'text/calendar') |
844 | 850 |
resp = resp.form.submit(status=200) |
845 | 851 |
assert 'File format is invalid' in resp.content |
... | ... | |
849 | 855 |
BEGIN:VEVENT |
850 | 856 |
DTSTART:20180101 |
851 | 857 |
DTEND:20180101 |
852 |
SUMMARY:New eve
|
|
858 |
SUMMARY:New Year's Eve
|
|
853 | 859 |
RRULE:FREQ=YEARLY |
854 | 860 |
END:VEVENT |
855 | 861 |
END:VCALENDAR""" |
... | ... | |
861 | 867 |
PRODID:-//foo.bar//EN |
862 | 868 |
BEGIN:VEVENT |
863 | 869 |
DTEND:20180101 |
864 |
SUMMARY:New eve
|
|
870 |
SUMMARY:New Year's Eve
|
|
865 | 871 |
END:VEVENT |
866 | 872 |
END:VCALENDAR""" |
867 | 873 |
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_start_date, 'text/calendar') |
868 | 874 |
resp = resp.form.submit(status=200) |
869 |
assert 'Event "New eve" has no start date.' in resp.content
|
|
875 |
assert 'Event "New Year's Eve" has no start date.' in resp.content
|
|
870 | 876 |
ics_with_no_events = """BEGIN:VCALENDAR |
871 | 877 |
VERSION:2.0 |
872 | 878 |
PRODID:-//foo.bar//EN |
... | ... | |
881 | 887 |
BEGIN:VEVENT |
882 | 888 |
DTSTART:20180101 |
883 | 889 |
DTEND:20180101 |
884 |
SUMMARY:New eve
|
|
890 |
SUMMARY:New Year's Eve
|
|
885 | 891 |
END:VEVENT |
886 | 892 |
END:VCALENDAR""" |
893 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
894 |
resp = resp.click('upload') |
|
887 | 895 |
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar') |
888 | 896 |
resp = resp.form.submit(status=302) |
889 |
assert TimePeriodException.objects.count() == 1 |
|
897 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1
|
|
890 | 898 |
resp = resp.follow() |
891 | 899 |
assert 'An exception has been imported.' in resp.content |
900 | ||
901 |
@mock.patch('chrono.agendas.models.requests.get') |
|
902 |
def test_agenda_import_time_period_exception_with_remote_ics(mocked_get, app, admin_user): |
|
903 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
904 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
905 |
MeetingType(agenda=agenda, label='Bar').save() |
|
906 |
login(app) |
|
907 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
908 |
assert 'Import exceptions from .ics' not in resp.content |
|
909 | ||
910 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
911 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
912 | ||
913 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
914 |
resp = resp.click('upload') |
|
915 | ||
916 |
assert 'ics_file' in resp.form.fields |
|
917 |
assert 'ics_url' in resp.form.fields |
|
918 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
919 |
mocked_response = mock.Mock() |
|
920 |
mocked_response.text = """BEGIN:VCALENDAR |
|
921 |
VERSION:2.0 |
|
922 |
PRODID:-//foo.bar//EN |
|
923 |
BEGIN:VEVENT |
|
924 |
UID:random-event-id |
|
925 |
DTSTART:20180101 |
|
926 |
DTEND:20180101 |
|
927 |
SUMMARY:New Year's Eve |
|
928 |
END:VEVENT |
|
929 |
END:VCALENDAR""" |
|
930 |
mocked_get.return_value = mocked_response |
|
931 |
resp = resp.form.submit(status=302) |
|
932 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
|
933 |
exception = TimePeriodException.objects.get(desk=desk) |
|
934 |
assert exception.external_id == 'random-event-id' |
|
935 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
936 |
resp = resp.click('upload') |
|
937 |
resp.form['ics_url'] = '' |
|
938 |
resp = resp.form.submit(status=302) |
|
939 |
assert not TimePeriodException.objects.filter(desk=desk, |
|
940 |
external_id='desk-%s:random-event-id' % desk.id).exists() |
|
941 | ||
942 |
@mock.patch('chrono.agendas.models.requests.get') |
|
943 |
def test_agenda_import_time_period_exception_with_remote_ics_no_events(mocked_get, app, admin_user): |
|
944 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
945 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
946 |
MeetingType(agenda=agenda, label='Bar').save() |
|
947 |
login(app) |
|
948 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
949 |
assert 'Import exceptions from .ics' not in resp.content |
|
950 | ||
951 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
952 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
953 | ||
954 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
955 |
resp = resp.click('upload') |
|
956 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
957 |
mocked_response = mock.Mock() |
|
958 |
mocked_response.text = """BEGIN:VCALENDAR |
|
959 |
VERSION:2.0 |
|
960 |
PRODID:-//foo.bar//EN |
|
961 |
BEGIN:VEVENT |
|
962 |
UID:random-event-id |
|
963 |
DTSTART:20180101 |
|
964 |
DTEND:20180101 |
|
965 |
SUMMARY:New Year's Eve |
|
966 |
END:VEVENT |
|
967 |
END:VCALENDAR""" |
|
968 |
mocked_get.return_value = mocked_response |
|
969 |
resp = resp.form.submit(status=302) |
|
970 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
|
971 |
exception = TimePeriodException.objects.get(desk=desk) |
|
972 |
assert exception.external_id == 'random-event-id' |
|
973 |
mocked_response.text = """BEGIN:VCALENDAR |
|
974 |
VERSION:2.0 |
|
975 |
PRODID:-//foo.bar//EN |
|
976 |
END:VCALENDAR""" |
|
977 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
978 |
resp = resp.click('upload') |
|
979 |
resp = resp.form.submit(status=302) |
|
980 |
assert not TimePeriodException.objects.filter(desk=desk, |
|
981 |
external_id='random-event-id').exists() |
|
982 | ||
983 | ||
984 |
@mock.patch('chrono.agendas.models.requests.get') |
|
985 |
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user): |
|
986 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
987 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
988 |
MeetingType(agenda=agenda, label='Bar').save() |
|
989 |
login(app) |
|
990 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
991 |
assert 'Import exceptions from .ics' not in resp.content |
|
992 | ||
993 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
994 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
995 | ||
996 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
997 |
resp = resp.click('upload') |
|
998 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
999 |
mocked_response = mock.Mock() |
|
1000 |
mocked_response.text = """BEGIN:VCALENDAR |
|
1001 |
VERSION:2.0 |
|
1002 |
PRODID:-//foo.bar//EN |
|
1003 |
BEGIN:VEVENT |
|
1004 |
UID:first-eventrandom-event-id |
|
1005 |
DTSTART:20180101 |
|
1006 |
DTEND:20180101 |
|
1007 |
SUMMARY:First test event |
|
1008 |
END:VEVENT |
|
1009 |
BEGIN:VEVENT |
|
1010 |
UID:second-eventrandom-event-id |
|
1011 |
DTSTART:20190101 |
|
1012 |
DTEND:20190101 |
|
1013 |
SUMMARY:Second test event |
|
1014 |
END:VEVENT |
|
1015 |
END:VCALENDAR""" |
|
1016 |
mocked_get.return_value = mocked_response |
|
1017 |
resp = resp.form.submit(status=302) |
|
1018 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
|
1019 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1020 |
resp = resp.click('upload') |
|
1021 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
1022 |
mocked_response.text = """BEGIN:VCALENDAR |
|
1023 |
VERSION:2.0 |
|
1024 |
PRODID:-//foo.bar//EN |
|
1025 |
BEGIN:VEVENT |
|
1026 |
UID:secord-eventrandom-event-id |
|
1027 |
DTSTART:20190101 |
|
1028 |
DTEND:20190101 |
|
1029 |
SUMMARY:Second test event |
|
1030 |
END:VEVENT |
|
1031 |
END:VCALENDAR""" |
|
1032 |
mocked_get.return_value = mocked_response |
|
1033 |
resp = resp.form.submit(status=302) |
|
1034 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
|
1035 | ||
1036 |
@mock.patch('chrono.agendas.models.requests.get') |
|
1037 |
def test_agenda_import_time_period_exception_from_remote_ics_with_connection_error(mocked_get, app, admin_user): |
|
1038 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
1039 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
1040 |
MeetingType(agenda=agenda, label='Bar').save() |
|
1041 |
login(app) |
|
1042 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1043 |
assert 'Import exceptions from .ics' not in resp.content |
|
1044 | ||
1045 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
1046 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
1047 | ||
1048 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1049 |
resp = resp.click('upload') |
|
1050 | ||
1051 |
assert 'ics_file' in resp.form.fields |
|
1052 |
assert 'ics_url' in resp.form.fields |
|
1053 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
1054 |
mocked_response = mock.Mock() |
|
1055 |
mocked_get.return_value = mocked_response |
|
1056 |
def mocked_requests_connection_error(*args, **kwargs): |
|
1057 |
raise requests.exceptions.ConnectionError('unreachable') |
|
1058 |
mocked_get.side_effect = mocked_requests_connection_error |
|
1059 |
resp = resp.form.submit(status=200) |
|
1060 |
assert 'Failed to retrieve remote calendar (unreachable).' in resp.content |
|
1061 | ||
1062 |
@mock.patch('chrono.agendas.models.requests.get') |
|
1063 |
def test_agenda_import_time_period_exception_from_forbidden_remote_ics(mocked_get, app, admin_user): |
|
1064 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
1065 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
1066 |
MeetingType(agenda=agenda, label='Bar').save() |
|
1067 |
login(app) |
|
1068 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1069 |
assert 'Import exceptions from .ics' not in resp.content |
|
1070 | ||
1071 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
1072 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
1073 | ||
1074 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1075 |
resp = resp.click('upload') |
|
1076 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
1077 |
mocked_response = mock.Mock() |
|
1078 |
mocked_response.status_code = 403 |
|
1079 |
mocked_get.return_value = mocked_response |
|
1080 |
def mocked_requests_http_forbidden_error(*args, **kwargs): |
|
1081 |
raise requests.exceptions.HTTPError(response=mocked_response) |
|
1082 |
mocked_get.side_effect = mocked_requests_http_forbidden_error |
|
1083 |
resp = resp.form.submit(status=200) |
|
1084 |
assert 'Failed to retrieve remote calendar (HTTP error 403).' in resp.content |
|
1085 | ||
1086 |
@mock.patch('chrono.agendas.models.requests.get') |
|
1087 |
def test_agenda_import_time_period_exception_from_remote_ics_with_ssl_error(mocked_get, app, admin_user): |
|
1088 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
1089 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
1090 |
MeetingType(agenda=agenda, label='Bar').save() |
|
1091 |
login(app) |
|
1092 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1093 |
assert 'Import exceptions from .ics' not in resp.content |
|
1094 |
TimePeriod.objects.create(weekday=1, desk=desk, |
|
1095 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
1096 | ||
1097 |
resp = app.get('/manage/agendas/%d/' % agenda.pk) |
|
1098 |
resp = resp.click('upload') |
|
1099 |
resp.form['ics_url'] = 'https://example.com/foo.ics' |
|
1100 |
mocked_response = mock.Mock() |
|
1101 |
mocked_get.return_value = mocked_response |
|
1102 |
def mocked_requests_http_ssl_error(*args, **kwargs): |
|
1103 |
raise requests.exceptions.SSLError('SSL error') |
|
1104 |
mocked_get.side_effect = mocked_requests_http_ssl_error |
|
1105 |
resp = resp.form.submit(status=200) |
|
1106 |
assert 'Failed to retrieve remote calendar (SSL error).' in resp.content |
|
892 |
- |