Projet

Général

Profil

0002-agendas-add-exception-source-model-29209.patch

Lauréline Guérin, 13 décembre 2019 14:46

Télécharger (39,8 ko)

Voir les différences:

Subject: [PATCH 2/4] agendas: add exception source model (#29209)

 .../sync_desks_timeperiod_exceptions.py       |   8 +-
 .../migrations/0008_auto_20160910_1319.py     |   2 +-
 .../0033_timeperiodexceptionsource.py         |  39 +++++
 .../agendas/migrations/0034_initial_source.py |  47 ++++++
 ...e_desk_timeperiod_exceptions_remote_url.py |  23 +++
 chrono/agendas/models.py                      |  92 ++++++------
 chrono/manager/forms.py                       |  17 ++-
 .../chrono/manager_import_exceptions.html     |  32 +++-
 chrono/manager/views.py                       |  16 +-
 tests/test_agendas.py                         | 139 +++++-------------
 tests/test_manager.py                         | 110 ++++----------
 11 files changed, 273 insertions(+), 252 deletions(-)
 create mode 100644 chrono/agendas/migrations/0033_timeperiodexceptionsource.py
 create mode 100644 chrono/agendas/migrations/0034_initial_source.py
 create mode 100644 chrono/agendas/migrations/0035_remove_desk_timeperiod_exceptions_remote_url.py
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py
18 18

  
19 19
import sys
20 20

  
21
from chrono.agendas.models import Desk, ICSError
21
from chrono.agendas.models import TimePeriodExceptionSource, ICSError
22 22
from django.core.management.base import BaseCommand
23 23

  
24 24

  
......
26 26
    help = 'Synchronize time period exceptions from desks remote ics'
27 27

  
28 28
    def handle(self, **options):
29
        for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
29
        for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
30 30
            try:
31
                desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
31
                source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
32 32
            except ICSError as e:
33
                print(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e), file=sys.stderr)
33
                print(u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr)
chrono/agendas/migrations/0008_auto_20160910_1319.py
37 37
        migrations.AddField(
38 38
            model_name='agenda',
39 39
            name='kind',
40
            field=models.CharField(default=b'events', max_length=20, verbose_name='Kind', choices=[(b'events', 'Events'), (b'meetings', 'Meetings')]),
40
            field=models.CharField(default='events', max_length=20, verbose_name='Kind', choices=[('events', 'Events'), ('meetings', 'Meetings')]),
41 41
        ),
42 42
        migrations.AddField(
43 43
            model_name='timeperiod',
chrono/agendas/migrations/0033_timeperiodexceptionsource.py
1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3

  
4
from django.db import migrations, models
5
import django.db.models.deletion
6

  
7

  
8
class Migration(migrations.Migration):
9

  
10
    dependencies = [
11
        ('agendas', '0032_auto_20191127_0919'),
12
    ]
13

  
14
    operations = [
15
        migrations.CreateModel(
16
            name='TimePeriodExceptionSource',
17
            fields=[
18
                ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
19
                ('ics_filename', models.CharField(max_length=256, null=True)),
20
                ('ics_url', models.URLField(null=True, max_length=500)),
21
                ('desk', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='agendas.Desk')),
22
            ],
23
        ),
24
        migrations.AddField(
25
            model_name='timeperiodexception',
26
            name='source',
27
            field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.TimePeriodExceptionSource'),
28
        ),
29
        migrations.AlterField(
30
            model_name='desk',
31
            name='timeperiod_exceptions_remote_url',
32
            field=models.URLField(blank=True, max_length=500, null=True, verbose_name='URL to fetch time period exceptions from'),
33
        ),
34
        migrations.AlterField(
35
            model_name='timeperiodexception',
36
            name='external_id',
37
            field=models.CharField(blank=True, max_length=256, null=True, verbose_name='External ID'),
38
        ),
39
    ]
chrono/agendas/migrations/0034_initial_source.py
1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3

  
4
from django.db import migrations
5

  
6

  
7
def create_source(apps, schema_editor):
8
    Desk = apps.get_model('agendas', 'Desk')
9
    TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
10
    for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
11
        # create a source for each remote url
12
        source = TimePeriodExceptionSource.objects.create(
13
            desk=desk,
14
            ics_url=desk.timeperiod_exceptions_remote_url)
15
        # clear timeperiod_exceptions_remote_url
16
        desk.timeperiod_exceptions_remote_url = None
17
        desk.save()
18
        # attach exceptions to the created source
19
        desk.timeperiodexception_set.filter(external_id__isnull=False).exclude(external_id='').update(source=source)
20

  
21

  
22
def init_remote_url(apps, schema_editor):
23
    Desk = apps.get_model('agendas', 'Desk')
24
    TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
25
    for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
26
        # set timeperiod_exceptions_remote_url
27
        source.desk.timeperiod_exceptions_remote_url = source.ics_url
28
        source.desk.save()
29
        # unlink exceptions
30
        source.timeperiodexception_set.update(source=None)
31
        # delete the source
32
        source.delete()
33
    # reset remote_url
34
    for desk in Desk.objects.filter(timeperiod_exceptions_remote_url__isnull=True):
35
        desk.timeperiod_exceptions_remote_url = ''
36
        desk.save()
37

  
38

  
39
class Migration(migrations.Migration):
40

  
41
    dependencies = [
42
        ('agendas', '0033_timeperiodexceptionsource'),
43
    ]
44

  
45
    operations = [
46
        migrations.RunPython(create_source, init_remote_url),
47
    ]
chrono/agendas/migrations/0035_remove_desk_timeperiod_exceptions_remote_url.py
1
# -*- coding: utf-8 -*-
2
# Generated by Django 1.11.18 on 2019-12-09 14:24
3
from __future__ import unicode_literals
4

  
5
from django.db import migrations
6

  
7

  
8
class Migration(migrations.Migration):
9

  
10
    dependencies = [
11
        ('agendas', '0034_initial_source'),
12
    ]
13

  
14
    operations = [
15
        migrations.RemoveField(
16
            model_name='desk',
17
            name='timeperiod_exceptions_remote_url',
18
        ),
19
        migrations.RemoveField(
20
            model_name='timeperiodexception',
21
            name='external_id',
22
        ),
23
    ]
chrono/agendas/models.py
469 469
    agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
470 470
    label = models.CharField(_('Label'), max_length=150)
471 471
    slug = models.SlugField(_('Identifier'), max_length=160)
472
    timeperiod_exceptions_remote_url = models.URLField(
473
        _('URL to fetch time period exceptions from'), blank=True, max_length=500
474
    )
475 472

  
476 473
    def __str__(self):
477 474
        return self.label
......
528 525
        in_two_weeks = self.get_exceptions_within_two_weeks()
529 526
        return self.timeperiodexception_set.count() == len(in_two_weeks)
530 527

  
531
    def create_timeperiod_exceptions_from_remote_ics(self, url):
528
    def import_timeperiod_exceptions_from_remote_ics(self, ics_url, source=None):
532 529
        try:
533
            response = requests.get(url, proxies=settings.REQUESTS_PROXIES)
530
            response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES)
534 531
            response.raise_for_status()
535 532
        except requests.HTTPError as e:
536 533
            raise ICSError(
537 534
                _('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).')
538
                % {'url': url, 'status_code': e.response.status_code}
535
                % {'url': ics_url, 'status_code': e.response.status_code}
539 536
            )
540 537
        except requests.RequestException as e:
541 538
            raise ICSError(
542 539
                _('Failed to retrieve remote calendar (%(url)s, %(exception)s).')
543
                % {'url': url, 'exception': e}
540
                % {'url': ics_url, 'exception': e}
544 541
            )
545 542

  
546
        return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True)
543
        if source is None:
544
            source = TimePeriodExceptionSource(desk=self, ics_url=ics_url)
545
        return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text)
547 546

  
548
    def remove_timeperiod_exceptions_from_remote_ics(self):
549
        TimePeriodException.objects.filter(desk=self).exclude(external_id='').delete()
547
    def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None):
548
        if source is None:
549
            source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name)
550
        return self._import_timeperiod_exceptions_from_ics(source=source, data=force_text(ics_file.read()))
550 551

  
551
    def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False, recurring_days=600):
552
    def _import_timeperiod_exceptions_from_ics(self, source, data, recurring_days=600):
552 553
        try:
553 554
            parsed = vobject.readOne(data)
554 555
        except vobject.base.ParseError:
......
556 557

  
557 558
        total_created = 0
558 559

  
559
        if not parsed.contents.get('vevent') and not keep_synced_by_uid:
560
        if not parsed.contents.get('vevent'):
560 561
            raise ICSError(_('The file doesn\'t contain any events.'))
561 562

  
562 563
        with transaction.atomic():
564
            if source.pk is None:
565
                source.save()
566
            # delete old exceptions related to this source
567
            source.timeperiodexception_set.all().delete()
568
            # create new exceptions
563 569
            update_datetime = now()
564 570
            for vevent in parsed.contents.get('vevent', []):
565 571
                if 'summary' in vevent.contents:
......
590 596
                        end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
591 597
                        duration = end_dt - start_dt
592 598

  
593
                event = {}
594
                event['start_datetime'] = start_dt
595
                event['end_datetime'] = end_dt
596
                event['label'] = summary
597

  
598
                kwargs = {}
599
                kwargs['desk'] = self
600
                kwargs['recurrence_id'] = 0
601
                if keep_synced_by_uid:
602
                    kwargs['external_id'] = vevent.contents['uid'][0].value
603
                else:
604
                    kwargs['label'] = summary
599
                event = {
600
                    'start_datetime': start_dt,
601
                    'end_datetime': end_dt,
602
                    'label': summary,
603
                    'desk': self,
604
                    'source': source,
605
                    'recurrence_id': 0,
606
                }
605 607

  
606 608
                if not vevent.rruleset:
607 609
                    # classical event
608
                    obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs)
609
                    if created:
610
                        total_created += 1
610
                    TimePeriodException.objects.create(**event)
611
                    total_created += 1
611 612
                elif vevent.rruleset.count():
612 613
                    # recurring event until recurring_days in the future
613 614
                    from_dt = start_dt
......
621 622
                        if not is_aware(start_dt):
622 623
                            start_dt = make_aware(start_dt)
623 624
                        end_dt = start_dt + duration
624
                        kwargs['recurrence_id'] = i
625
                        event['recurrence_id'] = i
625 626
                        event['start_datetime'] = start_dt
626 627
                        event['end_datetime'] = end_dt
627
                        if end_dt < update_datetime:
628
                            TimePeriodException.objects.filter(**kwargs).update(**event)
629
                        else:
630
                            obj, created = TimePeriodException.objects.update_or_create(
631
                                defaults=event, **kwargs
632
                            )
633
                            if created:
634
                                total_created += 1
635
                    # delete unseen occurrences
636
                    kwargs.pop('recurrence_id', None)
637
                    TimePeriodException.objects.filter(recurrence_id__gt=i, **kwargs).delete()
638

  
639
            if keep_synced_by_uid:
640
                # delete all outdated exceptions from remote calendar
641
                TimePeriodException.objects.filter(update_datetime__lt=update_datetime, desk=self).exclude(
642
                    external_id=''
643
                ).delete()
628
                        if end_dt >= update_datetime:
629
                            TimePeriodException.objects.create(**event)
630
                            total_created += 1
644 631

  
645 632
        return total_created
646 633

  
......
661 648
        return openslots.search(aware_date, aware_next_date)
662 649

  
663 650

  
651
@python_2_unicode_compatible
652
class TimePeriodExceptionSource(models.Model):
653
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
654
    ics_filename = models.CharField(null=True, max_length=256)
655
    ics_url = models.URLField(null=True, max_length=500)
656

  
657
    def __str__(self):
658
        if self.ics_filename is not None:
659
            return self.ics_filename
660
        return self.ics_url
661

  
662

  
664 663
@python_2_unicode_compatible
665 664
class TimePeriodException(models.Model):
666 665
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
667
    external_id = models.CharField(_('External ID'), max_length=256, blank=True)
666
    source = models.ForeignKey(TimePeriodExceptionSource, on_delete=models.CASCADE, null=True)
668 667
    label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
669 668
    start_datetime = models.DateTimeField(_('Exception start time'))
670 669
    end_datetime = models.DateTimeField(_('Exception end time'))
......
739 738
            'label': self.label,
740 739
            'start_datetime': export_datetime(self.start_datetime),
741 740
            'end_datetime': export_datetime(self.end_datetime),
742
            'external_id': self.external_id,
743 741
            'recurrence_id': self.recurrence_id,
744 742
            'update_datetime': export_datetime(self.update_datetime),
745 743
        }
chrono/manager/forms.py
136 136
        widgets = {
137 137
            'agenda': forms.HiddenInput(),
138 138
        }
139
        exclude = ['slug', 'timeperiod_exceptions_remote_url']
139
        exclude = ['slug']
140 140

  
141 141

  
142 142
class DeskForm(forms.ModelForm):
......
145 145
        widgets = {
146 146
            'agenda': forms.HiddenInput(),
147 147
        }
148
        exclude = ['timeperiod_exceptions_remote_url']
148
        exclude = []
149 149

  
150 150

  
151 151
class TimePeriodExceptionForm(forms.ModelForm):
......
248 248

  
249 249

  
250 250
class ExceptionsImportForm(forms.ModelForm):
251
    class Meta:
252
        model = Desk
253
        fields = []
254

  
255 251
    ics_file = forms.FileField(label=_('ICS File'), required=False,
256 252
            help_text=_('ICS file containing events which will be considered as exceptions.'))
257 253
    ics_url = forms.URLField(label=_('URL'), required=False,
258 254
            help_text=_('URL to remote calendar which will be synchronised hourly.'))
259 255

  
256
    class Meta:
257
        model = Desk
258
        fields = []
259

  
260
    def clean(self, *args, **kwargs):
261
        cleaned_data = super().clean(*args, **kwargs)
262
        if not cleaned_data.get('ics_file') and not cleaned_data.get('ics_url'):
263
            raise forms.ValidationError(_('Please provide an ICS File or an URL.'))
264

  
260 265

  
261 266
class AgendasImportForm(forms.Form):
262 267
    agendas_json = forms.FileField(label=_('Agendas Export File'))
chrono/manager/templates/chrono/manager_import_exceptions.html
13 13
{% block content %}
14 14

  
15 15
<form method="post" enctype="multipart/form-data">
16
  <p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p>
16
  {% if exception_sources %}
17
  <table class="main">
18
    <thead>
19
      <tr>
20
        <th>{% trans "Exceptions" %}</th>
21
        <th></th>
22
        <th></th>
23
      </tr>
24
    </thead>
25
    <tbody>
26
      {% for object in exception_sources %}
27
      <tr>
28
        <td>
29
          <span title="{{ object }}">
30
            {% if object.ics_filename %}{{ object|truncatechars:50 }}{% else %}<a href="{{ object }}">{{ object|truncatechars:50 }}</a>{% endif %}
31
          </span>
32
        </td>
33
        <td>
34
          <a rel="popup" href="">
35
            {% if object.ics_filename %}{% trans "replace" %}{% else %}{% trans "refresh" %}{% endif %}
36
          </a>
37
        </td>
38
        <td><a rel="popup" href="">{% trans "remove" %}</a></td>
39
      </tr>
40
      {% endfor %}
41
    </tbody>
42
   </table>
43
   <br />
44
  {% endif %}
45

  
46
  <p class="notice">{% trans "To add new exceptions, you can upload a file or specify an address to a remote calendar." %}</p>
17 47
  {% csrf_token %}
18 48
  {{ form.as_p }}
19 49
  <p>
chrono/manager/views.py
802 802
    form_class = ExceptionsImportForm
803 803
    template_name = 'chrono/manager_import_exceptions.html'
804 804

  
805
    def get_initial(self):
806
        return {'ics_url': self.get_object().timeperiod_exceptions_remote_url}
805
    def get_context_data(self, **kwargs):
806
        context = super(DeskImportTimePeriodExceptionsView, self).get_context_data(**kwargs)
807
        context['exception_sources'] = self.get_object().timeperiodexceptionsource_set.all()
808
        return context
807 809

  
808 810
    def form_valid(self, form):
809 811
        exceptions = None
810 812
        try:
811 813
            if form.cleaned_data['ics_file']:
812
                ics_file_content = force_text(form.cleaned_data['ics_file'].read())
813
                exceptions = form.instance.create_timeperiod_exceptions_from_ics(ics_file_content)
814
                exceptions = form.instance.import_timeperiod_exceptions_from_ics_file(form.cleaned_data['ics_file'])
814 815
            elif form.cleaned_data['ics_url']:
815
                exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
816
            else:
817
                form.instance.remove_timeperiod_exceptions_from_remote_ics()
816
                exceptions = form.instance.import_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
818 817
        except ICSError as e:
819 818
            form.add_error(None, force_text(e))
820 819
            return self.form_invalid(form)
821
        form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url']
822
        form.instance.save()
820

  
823 821
        if exceptions is not None:
824 822
            message = ungettext('An exception has been imported.',
825 823
                                '%(count)d exceptions have been imported.', exceptions)
tests/test_agendas.py
1 1
import pytest
2 2
import datetime
3 3
import mock
4
import re
5 4
import requests
6 5

  
7 6

  
8 7
from django.utils.timezone import now, make_aware, localtime
9 8
from django.contrib.auth.models import Group
9
from django.core.files.base import ContentFile
10 10
from django.core.management import call_command
11
from django.core.management.base import CommandError
12 11

  
13 12
from chrono.agendas.models import (
14 13
    Agenda,
......
16 15
    Booking,
17 16
    MeetingType,
18 17
    Desk,
19
    TimePeriod,
20 18
    TimePeriodException,
19
    TimePeriodExceptionSource,
21 20
    ICSError,
22 21
)
23 22

  
......
28 27
PRODID:-//foo.bar//EN
29 28
BEGIN:VEVENT
30 29
DTSTAMP:20170824T082855Z
31
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
32 30
DTSTART:20170831T170800Z
33 31
DTEND:20170831T203400Z
34 32
SEQUENCE:1
......
36 34
END:VEVENT
37 35
BEGIN:VEVENT
38 36
DTSTAMP:20170824T092855Z
39
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
40 37
DTSTART:20170830T180800Z
41 38
DTEND:20170831T223400Z
42 39
SEQUENCE:2
......
48 45
PRODID:-//foo.bar//EN
49 46
BEGIN:VEVENT
50 47
DTSTAMP:20170824T082855Z
51
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
52 48
DTSTART:20170831T170800Z
53 49
DURATION:PT3H26M
54 50
SEQUENCE:1
......
56 52
END:VEVENT
57 53
BEGIN:VEVENT
58 54
DTSTAMP:20170824T092855Z
59
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
60 55
DTSTART:20170830T180800Z
61 56
DURATION:P1D4H26M
62 57
SEQUENCE:2
......
83 78
END:VEVENT
84 79
END:VCALENDAR"""
85 80

  
86
ICS_SAMPLE_WITH_RECURRENT_EVENT_2 = """BEGIN:VCALENDAR
87
VERSION:2.0
88
PRODID:-//foo.bar//EN
89
BEGIN:VEVENT
90
DTSTAMP:20170720T145803Z
91
DESCRIPTION:Vacances d'ete
92
DTSTART;VALUE=DATE:20180101
93
DTEND;VALUE=DATE:20180101
94
SUMMARY:reccurent event
95
END:VEVENT
96
BEGIN:VEVENT
97
DTSTAMP:20170824T082855Z
98
DTSTART:20180102
99
DTEND:20180101
100
SUMMARY:New Year's Eve
101
RRULE:FREQ=YEARLY;COUNT=1
102
END:VEVENT
103
END:VCALENDAR"""
104

  
105 81
ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST = """BEGIN:VCALENDAR
106 82
VERSION:2.0
107 83
PRODID:-//foo.bar//EN
......
122 98
INVALID_ICS_SAMPLE = """content
123 99
"""
124 100

  
101

  
125 102
with open('tests/data/atreal.ics') as f:
126 103
    ICS_ATREAL = f.read()
127 104

  
......
213 190
    agenda.save()
214 191
    desk = Desk(label='Test 1 desk', agenda=agenda)
215 192
    desk.save()
216
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE)
193
    exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
194
        ContentFile(ICS_SAMPLE, name='sample.ics')
195
    )
217 196
    assert exceptions_count == 2
218 197
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
219 198

  
......
229 208
        if line.startswith('DTSTART:'):
230 209
            continue
231 210
        lines.append(line)
232
    ics_sample = "\n".join(lines)
211
    ics_sample = ContentFile("\n".join(lines), name='sample.ics')
233 212
    with pytest.raises(ICSError) as e:
234
        exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
213
        desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
235 214
    assert 'Event "Event 1" has no start date.' == str(e.value)
236 215

  
237 216

  
......
246 225
        if line.startswith('DTEND:'):
247 226
            continue
248 227
        lines.append(line)
249
    ics_sample = "\n".join(lines)
250
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
228
    ics_sample = ContentFile("\n".join(lines), name='sample.ics')
229
    desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
251 230
    for exception in TimePeriodException.objects.filter(desk=desk):
252 231
        end_time = localtime(exception.end_datetime).time()
253 232
        assert end_time == datetime.time(23, 59, 59, 999999)
......
259 238
    agenda.save()
260 239
    desk = Desk(label='Test 4 desk', agenda=agenda)
261 240
    desk.save()
262
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 3
263
    assert TimePeriodException.objects.filter(desk=desk).count() == 3
264
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
265
        [make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
266
    )
267
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 0
268
    # verify occurences are cleaned when count changed
269
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_2) == 0
270
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
271
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
272
        [make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2018, 1, 2))]
241
    assert (
242
        desk.import_timeperiod_exceptions_from_ics_file(
243
            ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')
244
        )
245
        == 3
273 246
    )
247
    assert TimePeriodException.objects.filter(desk=desk).count() == 3
274 248

  
275 249

  
276 250
def test_timeexception_creation_from_ics_with_dates():
......
284 258
        if line.startswith('RRULE:'):
285 259
            continue
286 260
        lines.append(line)
287
    ics_sample = "\n".join(lines)
288
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
261
    ics_sample = ContentFile("\n".join(lines), name='sample.ics')
262
    exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
289 263
    assert exceptions_count == 2
290 264
    for exception in TimePeriodException.objects.filter(desk=desk):
291 265
        assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
......
298 272
    desk = Desk(label='Test 6 desk', agenda=agenda)
299 273
    desk.save()
300 274
    with pytest.raises(ICSError) as e:
301
        exceptions_count = desk.create_timeperiod_exceptions_from_ics(INVALID_ICS_SAMPLE)
275
        desk.import_timeperiod_exceptions_from_ics_file(ContentFile(INVALID_ICS_SAMPLE, name='sample.ics'))
302 276
    assert str(e.value) == 'File format is invalid.'
303 277

  
304 278

  
......
308 282
    desk = Desk(label='Test 7 desk', agenda=agenda)
309 283
    desk.save()
310 284
    with pytest.raises(ICSError) as e:
311
        exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
285
        desk.import_timeperiod_exceptions_from_ics_file(
286
            ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics')
287
        )
312 288
    assert str(e.value) == "The file doesn't contain any events."
313 289

  
314 290

  
......
321 297
    mocked_response = mock.Mock()
322 298
    mocked_response.text = ICS_SAMPLE
323 299
    mocked_get.return_value = mocked_response
324
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
300
    exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
325 301
    assert exceptions_count == 2
326
    mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE)
327
    mocked_get.return_value = mocked_response
328
    desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
329
    for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk):
330
        assert 'New summary ' in timeperiod.label
331 302

  
332 303
    mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
333 304
    mocked_get.return_value = mocked_response
334
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
335
    assert exceptions_count == 0
336
    TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0
305
    with pytest.raises(ICSError) as e:
306
        desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
307
    assert str(e.value) == "The file doesn't contain any events."
337 308

  
338 309

  
339 310
@mock.patch('chrono.agendas.models.requests.get')
......
351 322

  
352 323
    mocked_get.side_effect = mocked_requests_connection_error
353 324
    with pytest.raises(ICSError) as e:
354
        exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
325
        desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
355 326
    assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)."
356 327

  
357 328

  
......
371 342
    mocked_get.side_effect = mocked_requests_http_forbidden_error
372 343

  
373 344
    with pytest.raises(ICSError) as e:
374
        exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
345
        desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
375 346
    assert (
376 347
        str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)."
377 348
    )
......
381 352
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
382 353
    agenda = Agenda(label=u'Test 11 agenda')
383 354
    agenda.save()
384
    desk = Desk(
385
        label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http://example.com/sample.ics'
386
    )
355
    desk = Desk(label='Test 11 desk', agenda=agenda)
387 356
    desk.save()
357
    TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
388 358
    mocked_response = mock.Mock()
389 359
    mocked_response.status_code = 403
390 360
    mocked_get.return_value = mocked_response
......
401 371
    )
402 372

  
403 373

  
404
@mock.patch('chrono.agendas.models.requests.get')
405
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
406
    agenda = Agenda(label=u'Test 11 agenda')
407
    agenda.save()
408
    desk = Desk(
409
        label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics'
410
    )
411
    desk.save()
412
    mocked_response = mock.Mock()
413
    mocked_response.text = ICS_SAMPLE
414
    mocked_get.return_value = mocked_response
415
    call_command('sync_desks_timeperiod_exceptions')
416
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
417
    mocked_response.text = """BEGIN:VCALENDAR
418
VERSION:2.0
419
PRODID:-//foo.bar//EN
420
BEGIN:VEVENT
421
DTSTAMP:20180824T082855Z
422
UID:new-and-unique-uid
423
DTSTART:20180831T170800Z
424
DTEND:20180831T203400Z
425
SUMMARY:Wonderfull event
426
END:VEVENT
427
END:VCALENDAR"""
428
    mocked_get.return_value = mocked_response
429
    call_command('sync_desks_timeperiod_exceptions')
430
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
431
    exception = TimePeriodException.objects.get(desk=desk)
432
    assert exception.external_id == 'new-and-unique-uid'
433
    mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
434
    mocked_get.return_value = mocked_response
435
    call_command('sync_desks_timeperiod_exceptions')
436
    assert not TimePeriodException.objects.filter(desk=desk).exists()
437

  
438

  
439 374
def test_base_meeting_duration():
440 375
    agenda = Agenda(label='Meeting', kind='meetings')
441 376
    agenda.save()
......
463 398
    agenda.save()
464 399
    desk = Desk(label='Test 1 desk', agenda=agenda)
465 400
    desk.save()
466
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_DURATION)
401
    exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
402
        ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
403
    )
467 404
    assert exceptions_count == 2
468 405
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
469 406
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
......
488 425
    agenda.save()
489 426
    desk = Desk(label='Test 4 desk', agenda=agenda)
490 427
    desk.save()
491
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 2
428
    assert (
429
        desk.import_timeperiod_exceptions_from_ics_file(
430
            ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics')
431
        )
432
        == 2
433
    )
492 434
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
493 435
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
494 436
        [make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
495 437
    )
496
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 0
497 438

  
498 439

  
499 440
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
......
501 442
    agenda.save()
502 443
    desk = Desk(label='Test atreal desk', agenda=agenda)
503 444
    desk.save()
504
    assert desk.create_timeperiod_exceptions_from_ics(ICS_ATREAL)
445
    assert desk.import_timeperiod_exceptions_from_ics_file(ContentFile(ICS_ATREAL, name='sample.ics'))
505 446

  
506 447

  
507 448
def test_management_role_deletion():
tests/test_manager.py
14 14
import requests
15 15
from webtest import Upload
16 16

  
17
from chrono.wsgi import application
18

  
19
from chrono.agendas.models import Agenda, Event, Booking, MeetingType, TimePeriod, Desk, TimePeriodException
20
from chrono.manager.utils import export_site
17
from chrono.agendas.models import (
18
    Agenda,
19
    Event,
20
    Booking,
21
    MeetingType,
22
    TimePeriod,
23
    Desk,
24
    TimePeriodException,
25
    TimePeriodExceptionSource,
26
)
21 27

  
22 28
pytestmark = pytest.mark.django_db
23 29

  
......
1158 1164
    resp = resp.click('Settings')
1159 1165
    assert 'Import exceptions from .ics' in resp.text
1160 1166
    resp = resp.click('upload')
1161
    assert "You can upload a file or specify an address to a remote calendar." in resp
1162
    resp = resp.form.submit(status=302)
1167
    assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp
1168
    resp = resp.form.submit(status=200)
1169
    assert 'Please provide an ICS File or an URL.' in resp.text
1163 1170
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1164 1171
    resp = resp.click('Settings')
1165 1172
    resp = resp.click('upload')
......
1200 1207
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
1201 1208
    resp = resp.form.submit(status=302)
1202 1209
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
1210
    assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
1211
    source = TimePeriodExceptionSource.objects.latest('pk')
1212
    exception = TimePeriodException.objects.latest('pk')
1213
    assert exception.source == source
1214
    assert source.ics_filename == 'exceptions.ics'
1215
    assert source.ics_url is None
1203 1216
    resp = resp.follow()
1204 1217
    assert 'An exception has been imported.' in resp.text
1205 1218

  
......
1257 1270
VERSION:2.0
1258 1271
PRODID:-//foo.bar//EN
1259 1272
BEGIN:VEVENT
1260
UID:random-event-id
1261 1273
DTSTART:20180101
1262 1274
DTEND:20180101
1263 1275
SUMMARY:New Year's Eve
......
1267 1279
    resp = resp.form.submit(status=302)
1268 1280
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
1269 1281
    exception = TimePeriodException.objects.get(desk=desk)
1270
    assert exception.external_id == 'random-event-id'
1271
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1272
    resp = resp.click('Settings')
1273
    resp = resp.click('upload')
1274
    resp.form['ics_url'] = ''
1275
    resp = resp.form.submit(status=302)
1276
    assert not TimePeriodException.objects.filter(
1277
        desk=desk, external_id='desk-%s:random-event-id' % desk.id
1278
    ).exists()
1282
    assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
1283
    source = TimePeriodExceptionSource.objects.latest('pk')
1284
    exception = TimePeriodException.objects.latest('pk')
1285
    assert exception.source == source
1286
    assert source.ics_filename is None
1287
    assert source.ics_url == 'http://example.com/foo.ics'
1279 1288

  
1280 1289

  
1281 1290
@mock.patch('chrono.agendas.models.requests.get')
......
1301 1310
VERSION:2.0
1302 1311
PRODID:-//foo.bar//EN
1303 1312
BEGIN:VEVENT
1304
UID:random-event-id
1305 1313
DTSTART:20180101
1306 1314
DTEND:20180101
1307 1315
SUMMARY:New Year's Eve
1308 1316
END:VEVENT
1309
END:VCALENDAR"""
1310
    mocked_get.return_value = mocked_response
1311
    resp = resp.form.submit(status=302)
1312
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
1313
    exception = TimePeriodException.objects.get(desk=desk)
1314
    assert exception.external_id == 'random-event-id'
1315
    mocked_response.text = """BEGIN:VCALENDAR
1316
VERSION:2.0
1317
PRODID:-//foo.bar//EN
1318
END:VCALENDAR"""
1319
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1320
    resp = resp.click('Settings')
1321
    resp = resp.click('upload')
1322
    resp = resp.form.submit(status=302)
1323
    assert not TimePeriodException.objects.filter(desk=desk, external_id='random-event-id').exists()
1324

  
1325

  
1326
@mock.patch('chrono.agendas.models.requests.get')
1327
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user):
1328
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
1329
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
1330
    MeetingType(agenda=agenda, label='Bar').save()
1331
    login(app)
1332
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1333
    resp = resp.click('Settings')
1334
    assert 'Import exceptions from .ics' not in resp.text
1335

  
1336
    TimePeriod.objects.create(
1337
        weekday=1, desk=desk, start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)
1338
    )
1339

  
1340
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1341
    resp = resp.click('Settings')
1342
    resp = resp.click('upload')
1343
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1344
    mocked_response = mock.Mock()
1345
    mocked_response.text = """BEGIN:VCALENDAR
1346
VERSION:2.0
1347
PRODID:-//foo.bar//EN
1348
BEGIN:VEVENT
1349
UID:first-eventrandom-event-id
1350
DTSTART:20180101
1351
DTEND:20180101
1352
SUMMARY:First test event
1353
END:VEVENT
1354
BEGIN:VEVENT
1355
UID:second-eventrandom-event-id
1356
DTSTART:20190101
1357
DTEND:20190101
1358
SUMMARY:Second test event
1359
END:VEVENT
1360
END:VCALENDAR"""
1361
    mocked_get.return_value = mocked_response
1362
    resp = resp.form.submit(status=302)
1363
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
1364
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1365
    resp = resp.click('Settings')
1366
    resp = resp.click('upload')
1367
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1368
    mocked_response.text = """BEGIN:VCALENDAR
1369
VERSION:2.0
1370
PRODID:-//foo.bar//EN
1371
BEGIN:VEVENT
1372
UID:secord-eventrandom-event-id
1373
DTSTART:20190101
1374
DTEND:20190101
1375
SUMMARY:Second test event
1376
END:VEVENT
1377 1317
END:VCALENDAR"""
1378 1318
    mocked_get.return_value = mocked_response
1379 1319
    resp = resp.form.submit(status=302)
1380
-