Projet

Général

Profil

0002-agendas-add-exception-source-model-29209.patch

Lauréline Guérin, 13 décembre 2019 22:19

Télécharger (40,3 ko)

Voir les différences:

Subject: [PATCH 2/4] agendas: add exception source model (#29209)

 .../sync_desks_timeperiod_exceptions.py       |  10 +-
 .../migrations/0008_auto_20160910_1319.py     |   4 +-
 .../0033_timeperiodexceptionsource.py         |  46 ++++++
 .../agendas/migrations/0034_initial_source.py |  49 ++++++
 ...e_desk_timeperiod_exceptions_remote_url.py |  17 ++
 chrono/agendas/models.py                      |  92 ++++++-----
 chrono/manager/forms.py                       |  17 +-
 .../chrono/manager_import_exceptions.html     |  32 +++-
 chrono/manager/views.py                       |  19 +--
 tests/test_agendas.py                         | 147 ++++++------------
 tests/test_manager.py                         | 109 +++----------
 11 files changed, 291 insertions(+), 251 deletions(-)
 create mode 100644 chrono/agendas/migrations/0033_timeperiodexceptionsource.py
 create mode 100644 chrono/agendas/migrations/0034_initial_source.py
 create mode 100644 chrono/agendas/migrations/0035_remove_desk_timeperiod_exceptions_remote_url.py
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py
20 20

  
21 21
from django.core.management.base import BaseCommand
22 22

  
23
from chrono.agendas.models import Desk, ICSError
23
from chrono.agendas.models import ICSError, TimePeriodExceptionSource
24 24

  
25 25

  
26 26
class Command(BaseCommand):
27 27
    help = 'Synchronize time period exceptions from desks remote ics'
28 28

  
29 29
    def handle(self, **options):
30
        for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
30
        for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
31 31
            try:
32
                desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
32
                source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
33 33
            except ICSError as e:
34
                print(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e), file=sys.stderr)
34
                print(
35
                    u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr
36
                )
chrono/agendas/migrations/0008_auto_20160910_1319.py
54 54
            model_name='agenda',
55 55
            name='kind',
56 56
            field=models.CharField(
57
                default=b'events',
57
                default='events',
58 58
                max_length=20,
59 59
                verbose_name='Kind',
60
                choices=[(b'events', 'Events'), (b'meetings', 'Meetings')],
60
                choices=[('events', 'Events'), ('meetings', 'Meetings')],
61 61
            ),
62 62
        ),
63 63
        migrations.AddField(
chrono/agendas/migrations/0033_timeperiodexceptionsource.py
1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3

  
4
import django.db.models.deletion
5
from django.db import migrations, models
6

  
7

  
8
class Migration(migrations.Migration):
9

  
10
    dependencies = [
11
        ('agendas', '0032_auto_20191127_0919'),
12
    ]
13

  
14
    operations = [
15
        migrations.CreateModel(
16
            name='TimePeriodExceptionSource',
17
            fields=[
18
                (
19
                    'id',
20
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
21
                ),
22
                ('ics_filename', models.CharField(max_length=256, null=True)),
23
                ('ics_url', models.URLField(null=True, max_length=500)),
24
                ('desk', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='agendas.Desk')),
25
            ],
26
        ),
27
        migrations.AddField(
28
            model_name='timeperiodexception',
29
            name='source',
30
            field=models.ForeignKey(
31
                null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.TimePeriodExceptionSource'
32
            ),
33
        ),
34
        migrations.AlterField(
35
            model_name='desk',
36
            name='timeperiod_exceptions_remote_url',
37
            field=models.URLField(
38
                blank=True, max_length=500, null=True, verbose_name='URL to fetch time period exceptions from'
39
            ),
40
        ),
41
        migrations.AlterField(
42
            model_name='timeperiodexception',
43
            name='external_id',
44
            field=models.CharField(blank=True, max_length=256, null=True, verbose_name='External ID'),
45
        ),
46
    ]
chrono/agendas/migrations/0034_initial_source.py
1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3

  
4
from django.db import migrations
5

  
6

  
7
def create_source(apps, schema_editor):
8
    Desk = apps.get_model('agendas', 'Desk')
9
    TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
10
    for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
11
        # create a source for each remote url
12
        source = TimePeriodExceptionSource.objects.create(
13
            desk=desk, ics_url=desk.timeperiod_exceptions_remote_url
14
        )
15
        # clear timeperiod_exceptions_remote_url
16
        desk.timeperiod_exceptions_remote_url = None
17
        desk.save()
18
        # attach exceptions to the created source
19
        desk.timeperiodexception_set.filter(external_id__isnull=False).exclude(external_id='').update(
20
            source=source
21
        )
22

  
23

  
24
def init_remote_url(apps, schema_editor):
25
    Desk = apps.get_model('agendas', 'Desk')
26
    TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
27
    for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
28
        # set timeperiod_exceptions_remote_url
29
        source.desk.timeperiod_exceptions_remote_url = source.ics_url
30
        source.desk.save()
31
        # unlink exceptions
32
        source.timeperiodexception_set.update(source=None)
33
        # delete the source
34
        source.delete()
35
    # reset remote_url
36
    for desk in Desk.objects.filter(timeperiod_exceptions_remote_url__isnull=True):
37
        desk.timeperiod_exceptions_remote_url = ''
38
        desk.save()
39

  
40

  
41
class Migration(migrations.Migration):
42

  
43
    dependencies = [
44
        ('agendas', '0033_timeperiodexceptionsource'),
45
    ]
46

  
47
    operations = [
48
        migrations.RunPython(create_source, init_remote_url),
49
    ]
chrono/agendas/migrations/0035_remove_desk_timeperiod_exceptions_remote_url.py
1
# -*- coding: utf-8 -*-
2
# Generated by Django 1.11.18 on 2019-12-09 14:24
3
from __future__ import unicode_literals
4

  
5
from django.db import migrations
6

  
7

  
8
class Migration(migrations.Migration):
9

  
10
    dependencies = [
11
        ('agendas', '0034_initial_source'),
12
    ]
13

  
14
    operations = [
15
        migrations.RemoveField(model_name='desk', name='timeperiod_exceptions_remote_url',),
16
        migrations.RemoveField(model_name='timeperiodexception', name='external_id',),
17
    ]
chrono/agendas/models.py
467 467
    agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
468 468
    label = models.CharField(_('Label'), max_length=150)
469 469
    slug = models.SlugField(_('Identifier'), max_length=160)
470
    timeperiod_exceptions_remote_url = models.URLField(
471
        _('URL to fetch time period exceptions from'), blank=True, max_length=500
472
    )
473 470

  
474 471
    def __str__(self):
475 472
        return self.label
......
526 523
        in_two_weeks = self.get_exceptions_within_two_weeks()
527 524
        return self.timeperiodexception_set.count() == len(in_two_weeks)
528 525

  
529
    def create_timeperiod_exceptions_from_remote_ics(self, url):
526
    def import_timeperiod_exceptions_from_remote_ics(self, ics_url, source=None):
530 527
        try:
531
            response = requests.get(url, proxies=settings.REQUESTS_PROXIES)
528
            response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES)
532 529
            response.raise_for_status()
533 530
        except requests.HTTPError as e:
534 531
            raise ICSError(
535 532
                _('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).')
536
                % {'url': url, 'status_code': e.response.status_code}
533
                % {'url': ics_url, 'status_code': e.response.status_code}
537 534
            )
538 535
        except requests.RequestException as e:
539 536
            raise ICSError(
540 537
                _('Failed to retrieve remote calendar (%(url)s, %(exception)s).')
541
                % {'url': url, 'exception': e}
538
                % {'url': ics_url, 'exception': e}
542 539
            )
543 540

  
544
        return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True)
541
        if source is None:
542
            source = TimePeriodExceptionSource(desk=self, ics_url=ics_url)
543
        return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text)
545 544

  
546
    def remove_timeperiod_exceptions_from_remote_ics(self):
547
        TimePeriodException.objects.filter(desk=self).exclude(external_id='').delete()
545
    def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None):
546
        if source is None:
547
            source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name)
548
        return self._import_timeperiod_exceptions_from_ics(source=source, data=force_text(ics_file.read()))
548 549

  
549
    def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False, recurring_days=600):
550
    def _import_timeperiod_exceptions_from_ics(self, source, data, recurring_days=600):
550 551
        try:
551 552
            parsed = vobject.readOne(data)
552 553
        except vobject.base.ParseError:
......
554 555

  
555 556
        total_created = 0
556 557

  
557
        if not parsed.contents.get('vevent') and not keep_synced_by_uid:
558
        if not parsed.contents.get('vevent'):
558 559
            raise ICSError(_('The file doesn\'t contain any events.'))
559 560

  
560 561
        with transaction.atomic():
562
            if source.pk is None:
563
                source.save()
564
            # delete old exceptions related to this source
565
            source.timeperiodexception_set.all().delete()
566
            # create new exceptions
561 567
            update_datetime = now()
562 568
            for vevent in parsed.contents.get('vevent', []):
563 569
                if 'summary' in vevent.contents:
......
588 594
                        end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time()))
589 595
                        duration = end_dt - start_dt
590 596

  
591
                event = {}
592
                event['start_datetime'] = start_dt
593
                event['end_datetime'] = end_dt
594
                event['label'] = summary
595

  
596
                kwargs = {}
597
                kwargs['desk'] = self
598
                kwargs['recurrence_id'] = 0
599
                if keep_synced_by_uid:
600
                    kwargs['external_id'] = vevent.contents['uid'][0].value
601
                else:
602
                    kwargs['label'] = summary
597
                event = {
598
                    'start_datetime': start_dt,
599
                    'end_datetime': end_dt,
600
                    'label': summary,
601
                    'desk': self,
602
                    'source': source,
603
                    'recurrence_id': 0,
604
                }
603 605

  
604 606
                if not vevent.rruleset:
605 607
                    # classical event
606
                    obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs)
607
                    if created:
608
                        total_created += 1
608
                    TimePeriodException.objects.create(**event)
609
                    total_created += 1
609 610
                elif vevent.rruleset.count():
610 611
                    # recurring event until recurring_days in the future
611 612
                    from_dt = start_dt
......
619 620
                        if not is_aware(start_dt):
620 621
                            start_dt = make_aware(start_dt)
621 622
                        end_dt = start_dt + duration
622
                        kwargs['recurrence_id'] = i
623
                        event['recurrence_id'] = i
623 624
                        event['start_datetime'] = start_dt
624 625
                        event['end_datetime'] = end_dt
625
                        if end_dt < update_datetime:
626
                            TimePeriodException.objects.filter(**kwargs).update(**event)
627
                        else:
628
                            obj, created = TimePeriodException.objects.update_or_create(
629
                                defaults=event, **kwargs
630
                            )
631
                            if created:
632
                                total_created += 1
633
                    # delete unseen occurrences
634
                    kwargs.pop('recurrence_id', None)
635
                    TimePeriodException.objects.filter(recurrence_id__gt=i, **kwargs).delete()
636

  
637
            if keep_synced_by_uid:
638
                # delete all outdated exceptions from remote calendar
639
                TimePeriodException.objects.filter(update_datetime__lt=update_datetime, desk=self).exclude(
640
                    external_id=''
641
                ).delete()
626
                        if end_dt >= update_datetime:
627
                            TimePeriodException.objects.create(**event)
628
                            total_created += 1
642 629

  
643 630
        return total_created
644 631

  
......
659 646
        return openslots.search(aware_date, aware_next_date)
660 647

  
661 648

  
649
@python_2_unicode_compatible
650
class TimePeriodExceptionSource(models.Model):
651
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
652
    ics_filename = models.CharField(null=True, max_length=256)
653
    ics_url = models.URLField(null=True, max_length=500)
654

  
655
    def __str__(self):
656
        if self.ics_filename is not None:
657
            return self.ics_filename
658
        return self.ics_url
659

  
660

  
662 661
@python_2_unicode_compatible
663 662
class TimePeriodException(models.Model):
664 663
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
665
    external_id = models.CharField(_('External ID'), max_length=256, blank=True)
664
    source = models.ForeignKey(TimePeriodExceptionSource, on_delete=models.CASCADE, null=True)
666 665
    label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
667 666
    start_datetime = models.DateTimeField(_('Exception start time'))
668 667
    end_datetime = models.DateTimeField(_('Exception end time'))
......
737 736
            'label': self.label,
738 737
            'start_datetime': export_datetime(self.start_datetime),
739 738
            'end_datetime': export_datetime(self.end_datetime),
740
            'external_id': self.external_id,
741 739
            'recurrence_id': self.recurrence_id,
742 740
            'update_datetime': export_datetime(self.update_datetime),
743 741
        }
chrono/manager/forms.py
141 141
        widgets = {
142 142
            'agenda': forms.HiddenInput(),
143 143
        }
144
        exclude = ['slug', 'timeperiod_exceptions_remote_url']
144
        exclude = ['slug']
145 145

  
146 146

  
147 147
class DeskForm(forms.ModelForm):
......
150 150
        widgets = {
151 151
            'agenda': forms.HiddenInput(),
152 152
        }
153
        exclude = ['timeperiod_exceptions_remote_url']
153
        exclude = []
154 154

  
155 155

  
156 156
class TimePeriodExceptionForm(forms.ModelForm):
......
261 261

  
262 262

  
263 263
class ExceptionsImportForm(forms.ModelForm):
264
    class Meta:
265
        model = Desk
266
        fields = []
267

  
268 264
    ics_file = forms.FileField(
269 265
        label=_('ICS File'),
270 266
        required=False,
......
276 272
        help_text=_('URL to remote calendar which will be synchronised hourly.'),
277 273
    )
278 274

  
275
    class Meta:
276
        model = Desk
277
        fields = []
278

  
279
    def clean(self, *args, **kwargs):
280
        cleaned_data = super().clean(*args, **kwargs)
281
        if not cleaned_data.get('ics_file') and not cleaned_data.get('ics_url'):
282
            raise forms.ValidationError(_('Please provide an ICS File or an URL.'))
283

  
279 284

  
280 285
class AgendasImportForm(forms.Form):
281 286
    agendas_json = forms.FileField(label=_('Agendas Export File'))
chrono/manager/templates/chrono/manager_import_exceptions.html
13 13
{% block content %}
14 14

  
15 15
<form method="post" enctype="multipart/form-data">
16
  <p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p>
16
  {% if exception_sources %}
17
  <table class="main">
18
    <thead>
19
      <tr>
20
        <th>{% trans "Exceptions" %}</th>
21
        <th></th>
22
        <th></th>
23
      </tr>
24
    </thead>
25
    <tbody>
26
      {% for object in exception_sources %}
27
      <tr>
28
        <td>
29
          <span title="{{ object }}">
30
            {% if object.ics_filename %}{{ object|truncatechars:50 }}{% else %}<a href="{{ object }}">{{ object|truncatechars:50 }}</a>{% endif %}
31
          </span>
32
        </td>
33
        <td>
34
          <a rel="popup" href="">
35
            {% if object.ics_filename %}{% trans "replace" %}{% else %}{% trans "refresh" %}{% endif %}
36
          </a>
37
        </td>
38
        <td><a rel="popup" href="">{% trans "remove" %}</a></td>
39
      </tr>
40
      {% endfor %}
41
    </tbody>
42
   </table>
43
   <br />
44
  {% endif %}
45

  
46
  <p class="notice">{% trans "To add new exceptions, you can upload a file or specify an address to a remote calendar." %}</p>
17 47
  {% csrf_token %}
18 48
  {{ form.as_p }}
19 49
  <p>
chrono/manager/views.py
20 20
from django.contrib import messages
21 21
from django.core.exceptions import PermissionDenied
22 22
from django.db.models import Q
23
from django.forms import ValidationError
23 24
from django.http import Http404, HttpResponse, HttpResponseRedirect
24 25
from django.shortcuts import get_object_or_404
25 26
from django.urls import reverse, reverse_lazy
......
881 882
    form_class = ExceptionsImportForm
882 883
    template_name = 'chrono/manager_import_exceptions.html'
883 884

  
884
    def get_initial(self):
885
        return {'ics_url': self.get_object().timeperiod_exceptions_remote_url}
885
    def get_context_data(self, **kwargs):
886
        context = super(DeskImportTimePeriodExceptionsView, self).get_context_data(**kwargs)
887
        context['exception_sources'] = self.get_object().timeperiodexceptionsource_set.all()
888
        return context
886 889

  
887 890
    def form_valid(self, form):
888 891
        exceptions = None
889 892
        try:
890 893
            if form.cleaned_data['ics_file']:
891
                ics_file_content = force_text(form.cleaned_data['ics_file'].read())
892
                exceptions = form.instance.create_timeperiod_exceptions_from_ics(ics_file_content)
894
                exceptions = form.instance.import_timeperiod_exceptions_from_ics_file(
895
                    form.cleaned_data['ics_file']
896
                )
893 897
            elif form.cleaned_data['ics_url']:
894
                exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(
898
                exceptions = form.instance.import_timeperiod_exceptions_from_remote_ics(
895 899
                    form.cleaned_data['ics_url']
896 900
                )
897
            else:
898
                form.instance.remove_timeperiod_exceptions_from_remote_ics()
899 901
        except ICSError as e:
900 902
            form.add_error(None, force_text(e))
901 903
            return self.form_invalid(form)
902
        form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url']
903
        form.instance.save()
904

  
904 905
        if exceptions is not None:
905 906
            message = ungettext(
906 907
                'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions
tests/test_agendas.py
1 1
import datetime
2
import re
3 2

  
4 3
import mock
5 4
import pytest
6 5
import requests
7 6
from django.contrib.auth.models import Group
7
from django.core.files.base import ContentFile
8 8
from django.core.management import call_command
9 9
from django.utils.timezone import localtime, make_aware, now
10 10

  
11
from chrono.agendas.models import Agenda, Booking, Desk, Event, ICSError, MeetingType, TimePeriodException
11
from chrono.agendas.models import (
12
    Agenda,
13
    Booking,
14
    Desk,
15
    Event,
16
    ICSError,
17
    MeetingType,
18
    TimePeriodException,
19
    TimePeriodExceptionSource,
20
)
12 21

  
13 22
pytestmark = pytest.mark.django_db
14 23

  
......
17 26
PRODID:-//foo.bar//EN
18 27
BEGIN:VEVENT
19 28
DTSTAMP:20170824T082855Z
20
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
21 29
DTSTART:20170831T170800Z
22 30
DTEND:20170831T203400Z
23 31
SEQUENCE:1
......
25 33
END:VEVENT
26 34
BEGIN:VEVENT
27 35
DTSTAMP:20170824T092855Z
28
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
29 36
DTSTART:20170830T180800Z
30 37
DTEND:20170831T223400Z
31 38
SEQUENCE:2
......
37 44
PRODID:-//foo.bar//EN
38 45
BEGIN:VEVENT
39 46
DTSTAMP:20170824T082855Z
40
UID:8c4c219889d244232c0a565f4950c3ff65dd5d64
41 47
DTSTART:20170831T170800Z
42 48
DURATION:PT3H26M
43 49
SEQUENCE:1
......
45 51
END:VEVENT
46 52
BEGIN:VEVENT
47 53
DTSTAMP:20170824T092855Z
48
UID:950c3ff889d2465dd5d648c4c2194232c0a565f4
49 54
DTSTART:20170830T180800Z
50 55
DURATION:P1D4H26M
51 56
SEQUENCE:2
......
72 77
END:VEVENT
73 78
END:VCALENDAR"""
74 79

  
75
ICS_SAMPLE_WITH_RECURRENT_EVENT_2 = """BEGIN:VCALENDAR
76
VERSION:2.0
77
PRODID:-//foo.bar//EN
78
BEGIN:VEVENT
79
DTSTAMP:20170720T145803Z
80
DESCRIPTION:Vacances d'ete
81
DTSTART;VALUE=DATE:20180101
82
DTEND;VALUE=DATE:20180101
83
SUMMARY:reccurent event
84
END:VEVENT
85
BEGIN:VEVENT
86
DTSTAMP:20170824T082855Z
87
DTSTART:20180102
88
DTEND:20180101
89
SUMMARY:New Year's Eve
90
RRULE:FREQ=YEARLY;COUNT=1
91
END:VEVENT
92
END:VCALENDAR"""
93

  
94 80
ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST = """BEGIN:VCALENDAR
95 81
VERSION:2.0
96 82
PRODID:-//foo.bar//EN
......
111 97
INVALID_ICS_SAMPLE = """content
112 98
"""
113 99

  
100

  
114 101
with open('tests/data/atreal.ics') as f:
115 102
    ICS_ATREAL = f.read()
116 103

  
......
202 189
    agenda.save()
203 190
    desk = Desk(label='Test 1 desk', agenda=agenda)
204 191
    desk.save()
205
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE)
192
    exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
193
        ContentFile(ICS_SAMPLE, name='sample.ics')
194
    )
206 195
    assert exceptions_count == 2
207 196
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
208 197

  
......
218 207
        if line.startswith('DTSTART:'):
219 208
            continue
220 209
        lines.append(line)
221
    ics_sample = "\n".join(lines)
210
    ics_sample = ContentFile("\n".join(lines), name='sample.ics')
222 211
    with pytest.raises(ICSError) as e:
223
        exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
212
        desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
224 213
    assert 'Event "Event 1" has no start date.' == str(e.value)
225 214

  
226 215

  
......
235 224
        if line.startswith('DTEND:'):
236 225
            continue
237 226
        lines.append(line)
238
    ics_sample = "\n".join(lines)
239
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
227
    ics_sample = ContentFile("\n".join(lines), name='sample.ics')
228
    desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
240 229
    for exception in TimePeriodException.objects.filter(desk=desk):
241 230
        end_time = localtime(exception.end_datetime).time()
242 231
        assert end_time == datetime.time(23, 59, 59, 999999)
......
248 237
    agenda.save()
249 238
    desk = Desk(label='Test 4 desk', agenda=agenda)
250 239
    desk.save()
251
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 3
252
    assert TimePeriodException.objects.filter(desk=desk).count() == 3
253
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
254
        [make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
255
    )
256
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 0
257
    # verify occurences are cleaned when count changed
258
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_2) == 0
259
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
260
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
261
        [make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2018, 1, 2))]
240
    assert (
241
        desk.import_timeperiod_exceptions_from_ics_file(
242
            ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')
243
        )
244
        == 3
262 245
    )
246
    assert TimePeriodException.objects.filter(desk=desk).count() == 3
263 247

  
264 248

  
265 249
def test_timeexception_creation_from_ics_with_dates():
......
273 257
        if line.startswith('RRULE:'):
274 258
            continue
275 259
        lines.append(line)
276
    ics_sample = "\n".join(lines)
277
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
260
    ics_sample = ContentFile("\n".join(lines), name='sample.ics')
261
    exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
278 262
    assert exceptions_count == 2
279 263
    for exception in TimePeriodException.objects.filter(desk=desk):
280 264
        assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
......
287 271
    desk = Desk(label='Test 6 desk', agenda=agenda)
288 272
    desk.save()
289 273
    with pytest.raises(ICSError) as e:
290
        exceptions_count = desk.create_timeperiod_exceptions_from_ics(INVALID_ICS_SAMPLE)
274
        desk.import_timeperiod_exceptions_from_ics_file(ContentFile(INVALID_ICS_SAMPLE, name='sample.ics'))
291 275
    assert str(e.value) == 'File format is invalid.'
292 276

  
293 277

  
......
297 281
    desk = Desk(label='Test 7 desk', agenda=agenda)
298 282
    desk.save()
299 283
    with pytest.raises(ICSError) as e:
300
        exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
284
        desk.import_timeperiod_exceptions_from_ics_file(
285
            ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics')
286
        )
301 287
    assert str(e.value) == "The file doesn't contain any events."
302 288

  
303 289

  
......
310 296
    mocked_response = mock.Mock()
311 297
    mocked_response.text = ICS_SAMPLE
312 298
    mocked_get.return_value = mocked_response
313
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
299
    exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
314 300
    assert exceptions_count == 2
315
    mocked_response.text = re.sub('SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE)
316
    mocked_get.return_value = mocked_response
317
    desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
318
    for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk):
319
        assert 'New summary ' in timeperiod.label
320 301

  
321 302
    mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
322 303
    mocked_get.return_value = mocked_response
323
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
324
    assert exceptions_count == 0
325
    TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0
304
    with pytest.raises(ICSError) as e:
305
        desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
306
    assert str(e.value) == "The file doesn't contain any events."
326 307

  
327 308

  
328 309
@mock.patch('chrono.agendas.models.requests.get')
......
340 321

  
341 322
    mocked_get.side_effect = mocked_requests_connection_error
342 323
    with pytest.raises(ICSError) as e:
343
        exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
324
        desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
344 325
    assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)."
345 326

  
346 327

  
......
360 341
    mocked_get.side_effect = mocked_requests_http_forbidden_error
361 342

  
362 343
    with pytest.raises(ICSError) as e:
363
        exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
344
        desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
364 345
    assert (
365 346
        str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)."
366 347
    )
......
370 351
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
371 352
    agenda = Agenda(label=u'Test 11 agenda')
372 353
    agenda.save()
373
    desk = Desk(
374
        label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http://example.com/sample.ics'
375
    )
354
    desk = Desk(label='Test 11 desk', agenda=agenda)
376 355
    desk.save()
356
    TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
377 357
    mocked_response = mock.Mock()
378 358
    mocked_response.status_code = 403
379 359
    mocked_get.return_value = mocked_response
......
390 370
    )
391 371

  
392 372

  
393
@mock.patch('chrono.agendas.models.requests.get')
394
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
395
    agenda = Agenda(label=u'Test 11 agenda')
396
    agenda.save()
397
    desk = Desk(
398
        label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics'
399
    )
400
    desk.save()
401
    mocked_response = mock.Mock()
402
    mocked_response.text = ICS_SAMPLE
403
    mocked_get.return_value = mocked_response
404
    call_command('sync_desks_timeperiod_exceptions')
405
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
406
    mocked_response.text = """BEGIN:VCALENDAR
407
VERSION:2.0
408
PRODID:-//foo.bar//EN
409
BEGIN:VEVENT
410
DTSTAMP:20180824T082855Z
411
UID:new-and-unique-uid
412
DTSTART:20180831T170800Z
413
DTEND:20180831T203400Z
414
SUMMARY:Wonderfull event
415
END:VEVENT
416
END:VCALENDAR"""
417
    mocked_get.return_value = mocked_response
418
    call_command('sync_desks_timeperiod_exceptions')
419
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
420
    exception = TimePeriodException.objects.get(desk=desk)
421
    assert exception.external_id == 'new-and-unique-uid'
422
    mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
423
    mocked_get.return_value = mocked_response
424
    call_command('sync_desks_timeperiod_exceptions')
425
    assert not TimePeriodException.objects.filter(desk=desk).exists()
426

  
427

  
428 373
def test_base_meeting_duration():
429 374
    agenda = Agenda(label='Meeting', kind='meetings')
430 375
    agenda.save()
......
452 397
    agenda.save()
453 398
    desk = Desk(label='Test 1 desk', agenda=agenda)
454 399
    desk.save()
455
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_DURATION)
400
    exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
401
        ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
402
    )
456 403
    assert exceptions_count == 2
457 404
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
458 405
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
......
477 424
    agenda.save()
478 425
    desk = Desk(label='Test 4 desk', agenda=agenda)
479 426
    desk.save()
480
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 2
427
    assert (
428
        desk.import_timeperiod_exceptions_from_ics_file(
429
            ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics')
430
        )
431
        == 2
432
    )
481 433
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
482 434
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set(
483 435
        [make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))]
484 436
    )
485
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 0
486 437

  
487 438

  
488 439
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
......
490 441
    agenda.save()
491 442
    desk = Desk(label='Test atreal desk', agenda=agenda)
492 443
    desk.save()
493
    assert desk.create_timeperiod_exceptions_from_ics(ICS_ATREAL)
444
    assert desk.import_timeperiod_exceptions_from_ics_file(ContentFile(ICS_ATREAL, name='sample.ics'))
494 445

  
495 446

  
496 447
def test_management_role_deletion():
tests/test_manager.py
15 15
from django.utils.timezone import localtime, make_aware, now
16 16
from webtest import Upload
17 17

  
18
from chrono.agendas.models import Agenda, Booking, Desk, Event, MeetingType, TimePeriod, TimePeriodException
19
from chrono.manager.utils import export_site
20
from chrono.wsgi import application
18
from chrono.agendas.models import (
19
    Agenda,
20
    Booking,
21
    Desk,
22
    Event,
23
    MeetingType,
24
    TimePeriod,
25
    TimePeriodException,
26
    TimePeriodExceptionSource,
27
)
21 28

  
22 29
pytestmark = pytest.mark.django_db
23 30

  
......
1158 1165
    resp = resp.click('Settings')
1159 1166
    assert 'Import exceptions from .ics' in resp.text
1160 1167
    resp = resp.click('upload')
1161
    assert "You can upload a file or specify an address to a remote calendar." in resp
1162
    resp = resp.form.submit(status=302)
1168
    assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp
1169
    resp = resp.form.submit(status=200)
1170
    assert 'Please provide an ICS File or an URL.' in resp.text
1163 1171
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1164 1172
    resp = resp.click('Settings')
1165 1173
    resp = resp.click('upload')
......
1200 1208
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
1201 1209
    resp = resp.form.submit(status=302)
1202 1210
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
1211
    assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
1212
    source = TimePeriodExceptionSource.objects.latest('pk')
1213
    exception = TimePeriodException.objects.latest('pk')
1214
    assert exception.source == source
1215
    assert source.ics_filename == 'exceptions.ics'
1216
    assert source.ics_url is None
1203 1217
    resp = resp.follow()
1204 1218
    assert 'An exception has been imported.' in resp.text
1205 1219

  
......
1257 1271
VERSION:2.0
1258 1272
PRODID:-//foo.bar//EN
1259 1273
BEGIN:VEVENT
1260
UID:random-event-id
1261 1274
DTSTART:20180101
1262 1275
DTEND:20180101
1263 1276
SUMMARY:New Year's Eve
......
1267 1280
    resp = resp.form.submit(status=302)
1268 1281
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
1269 1282
    exception = TimePeriodException.objects.get(desk=desk)
1270
    assert exception.external_id == 'random-event-id'
1271
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1272
    resp = resp.click('Settings')
1273
    resp = resp.click('upload')
1274
    resp.form['ics_url'] = ''
1275
    resp = resp.form.submit(status=302)
1276
    assert not TimePeriodException.objects.filter(
1277
        desk=desk, external_id='desk-%s:random-event-id' % desk.id
1278
    ).exists()
1283
    assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
1284
    source = TimePeriodExceptionSource.objects.latest('pk')
1285
    exception = TimePeriodException.objects.latest('pk')
1286
    assert exception.source == source
1287
    assert source.ics_filename is None
1288
    assert source.ics_url == 'http://example.com/foo.ics'
1279 1289

  
1280 1290

  
1281 1291
@mock.patch('chrono.agendas.models.requests.get')
......
1301 1311
VERSION:2.0
1302 1312
PRODID:-//foo.bar//EN
1303 1313
BEGIN:VEVENT
1304
UID:random-event-id
1305 1314
DTSTART:20180101
1306 1315
DTEND:20180101
1307 1316
SUMMARY:New Year's Eve
1308 1317
END:VEVENT
1309
END:VCALENDAR"""
1310
    mocked_get.return_value = mocked_response
1311
    resp = resp.form.submit(status=302)
1312
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
1313
    exception = TimePeriodException.objects.get(desk=desk)
1314
    assert exception.external_id == 'random-event-id'
1315
    mocked_response.text = """BEGIN:VCALENDAR
1316
VERSION:2.0
1317
PRODID:-//foo.bar//EN
1318
END:VCALENDAR"""
1319
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1320
    resp = resp.click('Settings')
1321
    resp = resp.click('upload')
1322
    resp = resp.form.submit(status=302)
1323
    assert not TimePeriodException.objects.filter(desk=desk, external_id='random-event-id').exists()
1324

  
1325

  
1326
@mock.patch('chrono.agendas.models.requests.get')
1327
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user):
1328
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
1329
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
1330
    MeetingType(agenda=agenda, label='Bar').save()
1331
    login(app)
1332
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1333
    resp = resp.click('Settings')
1334
    assert 'Import exceptions from .ics' not in resp.text
1335

  
1336
    TimePeriod.objects.create(
1337
        weekday=1, desk=desk, start_time=datetime.time(10, 0), end_time=datetime.time(12, 0)
1338
    )
1339

  
1340
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1341
    resp = resp.click('Settings')
1342
    resp = resp.click('upload')
1343
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1344
    mocked_response = mock.Mock()
1345
    mocked_response.text = """BEGIN:VCALENDAR
1346
VERSION:2.0
1347
PRODID:-//foo.bar//EN
1348
BEGIN:VEVENT
1349
UID:first-eventrandom-event-id
1350
DTSTART:20180101
1351
DTEND:20180101
1352
SUMMARY:First test event
1353
END:VEVENT
1354
BEGIN:VEVENT
1355
UID:second-eventrandom-event-id
1356
DTSTART:20190101
1357
DTEND:20190101
1358
SUMMARY:Second test event
1359
END:VEVENT
1360
END:VCALENDAR"""
1361
    mocked_get.return_value = mocked_response
1362
    resp = resp.form.submit(status=302)
1363
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
1364
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1365
    resp = resp.click('Settings')
1366
    resp = resp.click('upload')
1367
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1368
    mocked_response.text = """BEGIN:VCALENDAR
1369
VERSION:2.0
1370
PRODID:-//foo.bar//EN
1371
BEGIN:VEVENT
1372
UID:secord-eventrandom-event-id
1373
DTSTART:20190101
1374
DTEND:20190101
1375
SUMMARY:Second test event
1376
END:VEVENT
1377 1318
END:VCALENDAR"""
1378 1319
    mocked_get.return_value = mocked_response
1379 1320
    resp = resp.form.submit(status=302)
1380
-