0002-agendas-add-exception-source-model-29209.patch
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py | ||
---|---|---|
18 | 18 | |
19 | 19 |
import sys |
20 | 20 | |
21 |
from chrono.agendas.models import Desk, ICSError
|
|
21 |
from chrono.agendas.models import TimePeriodExceptionSource, ICSError
|
|
22 | 22 |
from django.core.management.base import BaseCommand |
23 | 23 | |
24 | 24 | |
... | ... | |
26 | 26 |
help = 'Synchronize time period exceptions from desks remote ics' |
27 | 27 | |
28 | 28 |
def handle(self, **options): |
29 |
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
|
|
29 |
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
|
|
30 | 30 |
try: |
31 |
desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
|
|
31 |
source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
|
|
32 | 32 |
except ICSError as e: |
33 |
print(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e), file=sys.stderr) |
|
33 |
print(u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr) |
chrono/agendas/migrations/0033_timeperiodexceptionsource.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
from __future__ import unicode_literals |
|
3 | ||
4 |
from django.db import migrations, models |
|
5 |
import django.db.models.deletion |
|
6 | ||
7 | ||
8 |
class Migration(migrations.Migration): |
|
9 | ||
10 |
dependencies = [ |
|
11 |
('agendas', '0032_auto_20191127_0919'), |
|
12 |
] |
|
13 | ||
14 |
operations = [ |
|
15 |
migrations.CreateModel( |
|
16 |
name='TimePeriodExceptionSource', |
|
17 |
fields=[ |
|
18 |
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), |
|
19 |
('ics_filename', models.CharField(max_length=256, null=True)), |
|
20 |
('ics_url', models.URLField(null=True, max_length=500)), |
|
21 |
('desk', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='agendas.Desk')), |
|
22 |
], |
|
23 |
), |
|
24 |
migrations.AddField( |
|
25 |
model_name='timeperiodexception', |
|
26 |
name='source', |
|
27 |
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.TimePeriodExceptionSource'), |
|
28 |
), |
|
29 |
migrations.AlterField( |
|
30 |
model_name='desk', |
|
31 |
name='timeperiod_exceptions_remote_url', |
|
32 |
field=models.URLField(blank=True, max_length=500, null=True, verbose_name='URL to fetch time period exceptions from'), |
|
33 |
), |
|
34 |
] |
chrono/agendas/migrations/0034_initial_source.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
from __future__ import unicode_literals |
|
3 | ||
4 |
from django.db import migrations |
|
5 | ||
6 | ||
7 |
def create_source(apps, schema_editor): |
|
8 |
Desk = apps.get_model('agendas', 'Desk') |
|
9 |
TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource') |
|
10 |
for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''): |
|
11 |
# create a source for each remote url |
|
12 |
source = TimePeriodExceptionSource.objects.create( |
|
13 |
desk=desk, |
|
14 |
ics_url=desk.timeperiod_exceptions_remote_url) |
|
15 |
# clear timeperiod_exceptions_remote_url |
|
16 |
desk.timeperiod_exceptions_remote_url = None |
|
17 |
desk.save() |
|
18 |
# attach exceptions to the created source |
|
19 |
desk.timeperiodexception_set.exclude(external_id='').update(source=source) |
|
20 | ||
21 | ||
22 |
def init_remote_url(apps, schema_editor): |
|
23 |
TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource') |
|
24 |
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False): |
|
25 |
# set timeperiod_exceptions_remote_url |
|
26 |
source.desk.timeperiod_exceptions_remote_url = source.ics_url |
|
27 |
source.desk.save() |
|
28 |
# unlink exceptions |
|
29 |
source.timeperiodexception_set.update(source=None) |
|
30 |
# delete the source |
|
31 |
source.delete() |
|
32 | ||
33 | ||
34 |
class Migration(migrations.Migration): |
|
35 | ||
36 |
dependencies = [ |
|
37 |
('agendas', '0033_timeperiodexceptionsource'), |
|
38 |
] |
|
39 | ||
40 |
operations = [ |
|
41 |
migrations.RunPython(create_source, init_remote_url), |
|
42 |
] |
chrono/agendas/migrations/0035_remove_desk_timeperiod_exceptions_remote_url.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# Generated by Django 1.11.18 on 2019-12-09 14:24 |
|
3 |
from __future__ import unicode_literals |
|
4 | ||
5 |
from django.db import migrations |
|
6 | ||
7 | ||
8 |
class Migration(migrations.Migration): |
|
9 | ||
10 |
dependencies = [ |
|
11 |
('agendas', '0034_initial_source'), |
|
12 |
] |
|
13 | ||
14 |
operations = [ |
|
15 |
migrations.RemoveField( |
|
16 |
model_name='desk', |
|
17 |
name='timeperiod_exceptions_remote_url', |
|
18 |
), |
|
19 |
] |
chrono/agendas/models.py | ||
---|---|---|
436 | 436 |
agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE) |
437 | 437 |
label = models.CharField(_('Label'), max_length=150) |
438 | 438 |
slug = models.SlugField(_('Identifier'), max_length=160) |
439 |
timeperiod_exceptions_remote_url = models.URLField( |
|
440 |
_('URL to fetch time period exceptions from'), |
|
441 |
blank=True, max_length=500) |
|
442 | 439 | |
443 | 440 |
def __str__(self): |
444 | 441 |
return self.label |
... | ... | |
493 | 490 |
in_two_weeks = self.get_exceptions_within_two_weeks() |
494 | 491 |
return self.timeperiodexception_set.count() == len(in_two_weeks) |
495 | 492 | |
496 |
def create_timeperiod_exceptions_from_remote_ics(self, url):
|
|
493 |
def import_timeperiod_exceptions_from_remote_ics(self, ics_url, source=None):
|
|
497 | 494 |
try: |
498 |
response = requests.get(url, proxies=settings.REQUESTS_PROXIES) |
|
495 |
response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES)
|
|
499 | 496 |
response.raise_for_status() |
500 | 497 |
except requests.HTTPError as e: |
501 | 498 |
raise ICSError( |
502 | 499 |
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).') % |
503 |
{'url': url, 'status_code': e.response.status_code}) |
|
500 |
{'url': ics_url, 'status_code': e.response.status_code})
|
|
504 | 501 |
except requests.RequestException as e: |
505 | 502 |
raise ICSError( |
506 | 503 |
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).') % |
507 |
{'url': url, 'exception': e}) |
|
504 |
{'url': ics_url, 'exception': e})
|
|
508 | 505 | |
509 |
return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True) |
|
506 |
if source is None: |
|
507 |
source = TimePeriodExceptionSource(desk=self, ics_url=ics_url) |
|
508 |
return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text, keep_synced_by_uid=True) |
|
510 | 509 | |
511 |
def remove_timeperiod_exceptions_from_remote_ics(self): |
|
512 |
TimePeriodException.objects.filter(desk=self).exclude(external_id='').delete() |
|
510 |
def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None): |
|
511 |
if source is None: |
|
512 |
source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name) |
|
513 |
return self._import_timeperiod_exceptions_from_ics(source=source, data=ics_file.read()) |
|
513 | 514 | |
514 |
def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False, recurring_days=600):
|
|
515 |
def _import_timeperiod_exceptions_from_ics(self, source, data, keep_synced_by_uid=False, recurring_days=600):
|
|
515 | 516 |
try: |
516 | 517 |
parsed = vobject.readOne(data) |
517 | 518 |
except vobject.base.ParseError: |
... | ... | |
523 | 524 |
raise ICSError(_('The file doesn\'t contain any events.')) |
524 | 525 | |
525 | 526 |
with transaction.atomic(): |
527 |
if source.pk is None: |
|
528 |
source.save() |
|
529 |
# delete old exceptions related to this source |
|
530 |
source.timeperiodexception_set.all().delete() |
|
531 |
# create new exceptions |
|
526 | 532 |
update_datetime = now() |
527 | 533 |
for vevent in parsed.contents.get('vevent', []): |
528 | 534 |
if 'summary' in vevent.contents: |
... | ... | |
560 | 566 | |
561 | 567 |
kwargs = {} |
562 | 568 |
kwargs['desk'] = self |
569 |
kwargs['source'] = source |
|
563 | 570 |
kwargs['recurrence_id'] = 0 |
564 | 571 |
if keep_synced_by_uid: |
565 | 572 |
kwargs['external_id'] = vevent.contents['uid'][0].value |
... | ... | |
593 | 600 |
obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs) |
594 | 601 |
if created: |
595 | 602 |
total_created += 1 |
596 |
# delete unseen occurrences |
|
597 |
kwargs.pop('recurrence_id', None) |
|
598 |
TimePeriodException.objects.filter(recurrence_id__gt=i, **kwargs).delete() |
|
599 | ||
600 |
if keep_synced_by_uid: |
|
601 |
# delete all outdated exceptions from remote calendar |
|
602 |
TimePeriodException.objects.filter( |
|
603 |
update_datetime__lt=update_datetime, |
|
604 |
desk=self |
|
605 |
).exclude(external_id='').delete() |
|
606 | 603 | |
607 | 604 |
return total_created |
608 | 605 | |
... | ... | |
623 | 620 |
return openslots.search(aware_date, aware_next_date) |
624 | 621 | |
625 | 622 | |
623 |
@python_2_unicode_compatible |
|
624 |
class TimePeriodExceptionSource(models.Model): |
|
625 |
desk = models.ForeignKey(Desk, on_delete=models.CASCADE) |
|
626 |
ics_filename = models.CharField(null=True, max_length=256) |
|
627 |
ics_url = models.URLField(null=True, max_length=500) |
|
628 | ||
629 |
def __str__(self): |
|
630 |
if self.ics_filename is not None: |
|
631 |
return self.ics_filename |
|
632 |
return self.ics_url |
|
633 | ||
634 | ||
626 | 635 |
@python_2_unicode_compatible |
627 | 636 |
class TimePeriodException(models.Model): |
628 | 637 |
desk = models.ForeignKey(Desk, on_delete=models.CASCADE) |
638 |
source = models.ForeignKey(TimePeriodExceptionSource, on_delete=models.CASCADE, null=True) |
|
629 | 639 |
external_id = models.CharField(_('External ID'), max_length=256, blank=True) |
630 | 640 |
label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True) |
631 | 641 |
start_datetime = models.DateTimeField(_('Exception start time')) |
chrono/manager/forms.py | ||
---|---|---|
137 | 137 |
widgets = { |
138 | 138 |
'agenda': forms.HiddenInput(), |
139 | 139 |
} |
140 |
exclude = ['slug', 'timeperiod_exceptions_remote_url']
|
|
140 |
exclude = ['slug'] |
|
141 | 141 | |
142 | 142 | |
143 | 143 |
class DeskForm(forms.ModelForm): |
... | ... | |
146 | 146 |
widgets = { |
147 | 147 |
'agenda': forms.HiddenInput(), |
148 | 148 |
} |
149 |
exclude = ['timeperiod_exceptions_remote_url']
|
|
149 |
exclude = [] |
|
150 | 150 | |
151 | 151 | |
152 | 152 |
class TimePeriodExceptionForm(forms.ModelForm): |
chrono/manager/templates/chrono/manager_import_exceptions.html | ||
---|---|---|
13 | 13 |
{% block content %} |
14 | 14 | |
15 | 15 |
<form method="post" enctype="multipart/form-data"> |
16 |
<p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p> |
|
16 |
{% if exception_sources %} |
|
17 |
<table class="main"> |
|
18 |
<thead> |
|
19 |
<tr> |
|
20 |
<th>{% trans "Exceptions" %}</th> |
|
21 |
<th></th> |
|
22 |
<th></th> |
|
23 |
</tr> |
|
24 |
</thead> |
|
25 |
<tbody> |
|
26 |
{% for object in exception_sources %} |
|
27 |
<tr> |
|
28 |
<td> |
|
29 |
<span title="{{ object }}"> |
|
30 |
{% if object.ics_filename %}{{ object|truncatechars:50 }}{% else %}<a href="{{ object }}">{{ object|truncatechars:50 }}</a>{% endif %} |
|
31 |
</span> |
|
32 |
</td> |
|
33 |
<td> |
|
34 |
<a rel="popup" href=""> |
|
35 |
{% if object.ics_filename %}{% trans "replace" %}{% else %}{% trans "refresh" %}{% endif %} |
|
36 |
</a> |
|
37 |
</td> |
|
38 |
<td><a rel="popup" href="">{% trans "remove" %}</a></td> |
|
39 |
</tr> |
|
40 |
{% endfor %} |
|
41 |
</tbody> |
|
42 |
</table> |
|
43 |
<br /> |
|
44 |
{% endif %} |
|
45 | ||
46 |
<p class="notice">{% trans "To add new exceptions, you can upload a file or specify an address to a remote calendar." %}</p> |
|
17 | 47 |
{% csrf_token %} |
18 | 48 |
{{ form.as_p }} |
19 | 49 |
<p> |
chrono/manager/views.py | ||
---|---|---|
802 | 802 |
form_class = ExceptionsImportForm |
803 | 803 |
template_name = 'chrono/manager_import_exceptions.html' |
804 | 804 | |
805 |
def get_initial(self): |
|
806 |
return {'ics_url': self.get_object().timeperiod_exceptions_remote_url} |
|
805 |
def get_context_data(self, **kwargs): |
|
806 |
context = super(DeskImportTimePeriodExceptionsView, self).get_context_data(**kwargs) |
|
807 |
context['exception_sources'] = self.get_object().timeperiodexceptionsource_set.all() |
|
808 |
return context |
|
807 | 809 | |
808 | 810 |
def form_valid(self, form): |
809 | 811 |
exceptions = None |
810 | 812 |
try: |
811 | 813 |
if form.cleaned_data['ics_file']: |
812 |
ics_file_content = force_text(form.cleaned_data['ics_file'].read()) |
|
813 |
exceptions = form.instance.create_timeperiod_exceptions_from_ics(ics_file_content) |
|
814 |
exceptions = form.instance.import_timeperiod_exceptions_from_ics_file(form.cleaned_data['ics_file']) |
|
814 | 815 |
elif form.cleaned_data['ics_url']: |
815 |
exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
|
|
816 |
exceptions = form.instance.import_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
|
|
816 | 817 |
else: |
817 |
form.instance.remove_timeperiod_exceptions_from_remote_ics() |
|
818 |
form.add_error(None, _('Please provide an ICS File or an URL.')) |
|
819 |
return self.form_invalid(form) |
|
818 | 820 |
except ICSError as e: |
819 | 821 |
form.add_error(None, force_text(e)) |
820 | 822 |
return self.form_invalid(form) |
821 |
form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url'] |
|
822 |
form.instance.save() |
|
823 | ||
823 | 824 |
if exceptions is not None: |
824 | 825 |
message = ungettext('An exception has been imported.', |
825 | 826 |
'%(count)d exceptions have been imported.', exceptions) |
tests/test_agendas.py | ||
---|---|---|
7 | 7 | |
8 | 8 |
from django.utils.timezone import now, make_aware, localtime |
9 | 9 |
from django.contrib.auth.models import Group |
10 |
from django.core.files.base import ContentFile |
|
10 | 11 |
from django.core.management import call_command |
11 | 12 | |
12 | 13 |
from chrono.agendas.models import ( |
13 | 14 |
Agenda, Event, Booking, MeetingType, |
14 |
Desk, TimePeriodException, ICSError) |
|
15 |
Desk, TimePeriodException, TimePeriodExceptionSource, ICSError)
|
|
15 | 16 | |
16 | 17 |
pytestmark = pytest.mark.django_db |
17 | 18 | |
... | ... | |
75 | 76 |
END:VEVENT |
76 | 77 |
END:VCALENDAR""" |
77 | 78 | |
78 |
ICS_SAMPLE_WITH_RECURRENT_EVENT_2 = """BEGIN:VCALENDAR |
|
79 |
VERSION:2.0 |
|
80 |
PRODID:-//foo.bar//EN |
|
81 |
BEGIN:VEVENT |
|
82 |
DTSTAMP:20170720T145803Z |
|
83 |
DESCRIPTION:Vacances d'ete |
|
84 |
DTSTART;VALUE=DATE:20180101 |
|
85 |
DTEND;VALUE=DATE:20180101 |
|
86 |
SUMMARY:reccurent event |
|
87 |
END:VEVENT |
|
88 |
BEGIN:VEVENT |
|
89 |
DTSTAMP:20170824T082855Z |
|
90 |
DTSTART:20180102 |
|
91 |
DTEND:20180101 |
|
92 |
SUMMARY:New Year's Eve |
|
93 |
RRULE:FREQ=YEARLY;COUNT=1 |
|
94 |
END:VEVENT |
|
95 |
END:VCALENDAR""" |
|
96 | ||
97 | 79 |
ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST = """BEGIN:VCALENDAR |
98 | 80 |
VERSION:2.0 |
99 | 81 |
PRODID:-//foo.bar//EN |
... | ... | |
114 | 96 |
INVALID_ICS_SAMPLE = """content |
115 | 97 |
""" |
116 | 98 | |
99 | ||
117 | 100 |
with open('tests/data/atreal.ics') as f: |
118 | 101 |
ICS_ATREAL = f.read() |
119 | 102 | |
... | ... | |
205 | 188 |
agenda.save() |
206 | 189 |
desk = Desk(label='Test 1 desk', agenda=agenda) |
207 | 190 |
desk.save() |
208 |
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE) |
|
191 |
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file( |
|
192 |
ContentFile(ICS_SAMPLE, name='sample.ics')) |
|
209 | 193 |
assert exceptions_count == 2 |
210 | 194 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
211 | 195 | |
... | ... | |
221 | 205 |
if line.startswith('DTSTART:'): |
222 | 206 |
continue |
223 | 207 |
lines.append(line) |
224 |
ics_sample = "\n".join(lines)
|
|
208 |
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
|
225 | 209 |
with pytest.raises(ICSError) as e: |
226 |
desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
|
210 |
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
|
227 | 211 |
assert 'Event "Event 1" has no start date.' == str(e.value) |
228 | 212 | |
229 | 213 | |
... | ... | |
238 | 222 |
if line.startswith('DTEND:'): |
239 | 223 |
continue |
240 | 224 |
lines.append(line) |
241 |
ics_sample = "\n".join(lines)
|
|
242 |
desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
|
225 |
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
|
226 |
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
|
243 | 227 |
for exception in TimePeriodException.objects.filter(desk=desk): |
244 | 228 |
end_time = localtime(exception.end_datetime).time() |
245 | 229 |
assert end_time == datetime.time(23, 59, 59, 999999) |
... | ... | |
251 | 235 |
agenda.save() |
252 | 236 |
desk = Desk(label='Test 4 desk', agenda=agenda) |
253 | 237 |
desk.save() |
254 |
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 3 |
|
238 |
assert desk.import_timeperiod_exceptions_from_ics_file( |
|
239 |
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')) == 3 |
|
255 | 240 |
assert TimePeriodException.objects.filter(desk=desk).count() == 3 |
256 |
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set([ |
|
257 |
make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]) |
|
258 |
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 0 |
|
259 |
# verify occurences are cleaned when count changed |
|
260 |
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_2) == 0 |
|
261 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
|
262 |
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set([ |
|
263 |
make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2018, 1, 2))]) |
|
264 | 241 | |
265 | 242 | |
266 | 243 |
def test_timeexception_creation_from_ics_with_dates(): |
... | ... | |
274 | 251 |
if line.startswith('RRULE:'): |
275 | 252 |
continue |
276 | 253 |
lines.append(line) |
277 |
ics_sample = "\n".join(lines)
|
|
278 |
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
|
|
254 |
ics_sample = ContentFile("\n".join(lines), name='sample.ics')
|
|
255 |
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
|
279 | 256 |
assert exceptions_count == 2 |
280 | 257 |
for exception in TimePeriodException.objects.filter(desk=desk): |
281 | 258 |
assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0)) |
... | ... | |
288 | 265 |
desk = Desk(label='Test 6 desk', agenda=agenda) |
289 | 266 |
desk.save() |
290 | 267 |
with pytest.raises(ICSError) as e: |
291 |
desk.create_timeperiod_exceptions_from_ics(INVALID_ICS_SAMPLE) |
|
268 |
desk.import_timeperiod_exceptions_from_ics_file( |
|
269 |
ContentFile(INVALID_ICS_SAMPLE, name='sample.ics')) |
|
292 | 270 |
assert str(e.value) == 'File format is invalid.' |
293 | 271 | |
294 | 272 | |
... | ... | |
298 | 276 |
desk = Desk(label='Test 7 desk', agenda=agenda) |
299 | 277 |
desk.save() |
300 | 278 |
with pytest.raises(ICSError) as e: |
301 |
desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS) |
|
279 |
desk.import_timeperiod_exceptions_from_ics_file( |
|
280 |
ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics')) |
|
302 | 281 |
assert str(e.value) == "The file doesn't contain any events." |
303 | 282 | |
304 | 283 | |
... | ... | |
311 | 290 |
mocked_response = mock.Mock() |
312 | 291 |
mocked_response.text = ICS_SAMPLE |
313 | 292 |
mocked_get.return_value = mocked_response |
314 |
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
293 |
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
315 | 294 |
assert exceptions_count == 2 |
316 | 295 |
mocked_response.text = re.sub(r'SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE) |
317 | 296 |
mocked_get.return_value = mocked_response |
318 |
desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
297 |
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
319 | 298 |
for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk): |
320 | 299 |
assert 'New summary ' in timeperiod.label |
321 | 300 | |
322 | 301 |
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS |
323 | 302 |
mocked_get.return_value = mocked_response |
324 |
exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
303 |
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
325 | 304 |
assert exceptions_count == 0 |
326 | 305 |
TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0 |
327 | 306 | |
... | ... | |
341 | 320 | |
342 | 321 |
mocked_get.side_effect = mocked_requests_connection_error |
343 | 322 |
with pytest.raises(ICSError) as e: |
344 |
desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
323 |
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
345 | 324 |
assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)." |
346 | 325 | |
347 | 326 | |
... | ... | |
361 | 340 |
mocked_get.side_effect = mocked_requests_http_forbidden_error |
362 | 341 | |
363 | 342 |
with pytest.raises(ICSError) as e: |
364 |
desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
343 |
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
365 | 344 |
assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)." |
366 | 345 | |
367 | 346 | |
... | ... | |
369 | 348 |
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys): |
370 | 349 |
agenda = Agenda(label=u'Test 11 agenda') |
371 | 350 |
agenda.save() |
372 |
desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http://example.com/sample.ics')
|
|
351 |
desk = Desk(label='Test 11 desk', agenda=agenda) |
|
373 | 352 |
desk.save() |
353 |
TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics') |
|
374 | 354 |
mocked_response = mock.Mock() |
375 | 355 |
mocked_response.status_code = 403 |
376 | 356 |
mocked_get.return_value = mocked_response |
... | ... | |
390 | 370 |
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog): |
391 | 371 |
agenda = Agenda(label=u'Test 11 agenda') |
392 | 372 |
agenda.save() |
393 |
desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
|
|
373 |
desk = Desk(label='Test 11 desk', agenda=agenda) |
|
394 | 374 |
desk.save() |
375 |
TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics') |
|
395 | 376 |
mocked_response = mock.Mock() |
396 | 377 |
mocked_response.text = ICS_SAMPLE |
397 | 378 |
mocked_get.return_value = mocked_response |
... | ... | |
446 | 427 |
agenda.save() |
447 | 428 |
desk = Desk(label='Test 1 desk', agenda=agenda) |
448 | 429 |
desk.save() |
449 |
exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_DURATION) |
|
430 |
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file( |
|
431 |
ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')) |
|
450 | 432 |
assert exceptions_count == 2 |
451 | 433 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
452 | 434 |
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set([ |
... | ... | |
467 | 449 |
agenda.save() |
468 | 450 |
desk = Desk(label='Test 4 desk', agenda=agenda) |
469 | 451 |
desk.save() |
470 |
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 2 |
|
452 |
assert desk.import_timeperiod_exceptions_from_ics_file( |
|
453 |
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics')) == 2 |
|
471 | 454 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
472 | 455 |
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set([ |
473 | 456 |
make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]) |
474 |
assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 0 |
|
475 | 457 | |
476 | 458 | |
477 | 459 |
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal(): |
... | ... | |
479 | 461 |
agenda.save() |
480 | 462 |
desk = Desk(label='Test atreal desk', agenda=agenda) |
481 | 463 |
desk.save() |
482 |
assert desk.create_timeperiod_exceptions_from_ics(ICS_ATREAL) |
|
464 |
assert desk.import_timeperiod_exceptions_from_ics_file( |
|
465 |
ContentFile(ICS_ATREAL, name='sample.ics')) |
|
483 | 466 | |
484 | 467 | |
485 | 468 |
def test_management_role_deletion(): |
tests/test_manager.py | ||
---|---|---|
15 | 15 |
from webtest import Upload |
16 | 16 | |
17 | 17 |
from chrono.agendas.models import (Agenda, Event, Booking, MeetingType, |
18 |
TimePeriod, Desk, TimePeriodException) |
|
18 |
TimePeriod, Desk, TimePeriodException, |
|
19 |
TimePeriodExceptionSource) |
|
19 | 20 | |
20 | 21 |
pytestmark = pytest.mark.django_db |
21 | 22 | |
... | ... | |
1142 | 1143 |
resp = resp.click('Settings') |
1143 | 1144 |
assert 'Import exceptions from .ics' in resp.text |
1144 | 1145 |
resp = resp.click('upload') |
1145 |
assert "You can upload a file or specify an address to a remote calendar." in resp |
|
1146 |
resp = resp.form.submit(status=302) |
|
1146 |
assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp |
|
1147 |
resp = resp.form.submit(status=200) |
|
1148 |
assert 'Please provide an ICS File or an URL.' in resp.text |
|
1147 | 1149 |
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow() |
1148 | 1150 |
resp = resp.click('Settings') |
1149 | 1151 |
resp = resp.click('upload') |
... | ... | |
1184 | 1186 |
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar') |
1185 | 1187 |
resp = resp.form.submit(status=302) |
1186 | 1188 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
1189 |
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1 |
|
1190 |
source = TimePeriodExceptionSource.objects.latest('pk') |
|
1191 |
exception = TimePeriodException.objects.latest('pk') |
|
1192 |
assert exception.source == source |
|
1193 |
assert source.ics_filename == 'exceptions.ics' |
|
1194 |
assert source.ics_url is None |
|
1187 | 1195 |
resp = resp.follow() |
1188 | 1196 |
assert 'An exception has been imported.' in resp.text |
1189 | 1197 | |
... | ... | |
1252 | 1260 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
1253 | 1261 |
exception = TimePeriodException.objects.get(desk=desk) |
1254 | 1262 |
assert exception.external_id == 'random-event-id' |
1255 |
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow() |
|
1256 |
resp = resp.click('Settings') |
|
1257 |
resp = resp.click('upload') |
|
1258 |
resp.form['ics_url'] = '' |
|
1259 |
resp = resp.form.submit(status=302) |
|
1260 |
assert not TimePeriodException.objects.filter( |
|
1261 |
desk=desk, |
|
1262 |
external_id='desk-%s:random-event-id' % desk.id).exists() |
|
1263 |
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1 |
|
1264 |
source = TimePeriodExceptionSource.objects.latest('pk') |
|
1265 |
exception = TimePeriodException.objects.latest('pk') |
|
1266 |
assert exception.source == source |
|
1267 |
assert source.ics_filename is None |
|
1268 |
assert source.ics_url == 'http://example.com/foo.ics' |
|
1263 | 1269 | |
1264 | 1270 | |
1265 | 1271 |
@mock.patch('chrono.agendas.models.requests.get') |
... | ... | |
1300 | 1306 |
VERSION:2.0 |
1301 | 1307 |
PRODID:-//foo.bar//EN |
1302 | 1308 |
END:VCALENDAR""" |
1303 |
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow() |
|
1304 |
resp = resp.click('Settings') |
|
1305 |
resp = resp.click('upload') |
|
1306 |
resp = resp.form.submit(status=302) |
|
1307 |
assert not TimePeriodException.objects.filter( |
|
1308 |
desk=desk, |
|
1309 |
external_id='random-event-id').exists() |
|
1310 | ||
1311 | ||
1312 |
@mock.patch('chrono.agendas.models.requests.get') |
|
1313 |
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user): |
|
1314 |
agenda = Agenda.objects.create(label='New Example', kind='meetings') |
|
1315 |
desk = Desk.objects.create(agenda=agenda, label='New Desk') |
|
1316 |
MeetingType(agenda=agenda, label='Bar').save() |
|
1317 |
login(app) |
|
1318 |
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow() |
|
1319 |
resp = resp.click('Settings') |
|
1320 |
assert 'Import exceptions from .ics' not in resp.text |
|
1321 | ||
1322 |
TimePeriod.objects.create( |
|
1323 |
weekday=1, desk=desk, |
|
1324 |
start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)) |
|
1325 | ||
1326 |
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow() |
|
1327 |
resp = resp.click('Settings') |
|
1328 |
resp = resp.click('upload') |
|
1329 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
1330 |
mocked_response = mock.Mock() |
|
1331 |
mocked_response.text = """BEGIN:VCALENDAR |
|
1332 |
VERSION:2.0 |
|
1333 |
PRODID:-//foo.bar//EN |
|
1334 |
BEGIN:VEVENT |
|
1335 |
UID:first-eventrandom-event-id |
|
1336 |
DTSTART:20180101 |
|
1337 |
DTEND:20180101 |
|
1338 |
SUMMARY:First test event |
|
1339 |
END:VEVENT |
|
1340 |
BEGIN:VEVENT |
|
1341 |
UID:second-eventrandom-event-id |
|
1342 |
DTSTART:20190101 |
|
1343 |
DTEND:20190101 |
|
1344 |
SUMMARY:Second test event |
|
1345 |
END:VEVENT |
|
1346 |
END:VCALENDAR""" |
|
1347 |
mocked_get.return_value = mocked_response |
|
1348 |
resp = resp.form.submit(status=302) |
|
1349 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
|
1350 |
resp = app.get('/manage/agendas/%d/' % agenda.pk).follow() |
|
1351 |
resp = resp.click('Settings') |
|
1352 |
resp = resp.click('upload') |
|
1353 |
resp.form['ics_url'] = 'http://example.com/foo.ics' |
|
1354 |
mocked_response.text = """BEGIN:VCALENDAR |
|
1355 |
VERSION:2.0 |
|
1356 |
PRODID:-//foo.bar//EN |
|
1357 |
BEGIN:VEVENT |
|
1358 |
UID:secord-eventrandom-event-id |
|
1359 |
DTSTART:20190101 |
|
1360 |
DTEND:20190101 |
|
1361 |
SUMMARY:Second test event |
|
1362 |
END:VEVENT |
|
1363 |
END:VCALENDAR""" |
|
1364 |
mocked_get.return_value = mocked_response |
|
1365 |
resp = resp.form.submit(status=302) |
|
1366 |
assert TimePeriodException.objects.filter(desk=desk).count() == 1 |
|
1367 | 1309 | |
1368 | 1310 | |
1369 | 1311 |
@mock.patch('chrono.agendas.models.requests.get') |
1370 |
- |