Projet

Général

Profil

0002-agendas-add-exception-source-model-29209.patch

Lauréline Guérin, 12 décembre 2019 10:57

Télécharger (32,2 ko)

Voir les différences:

Subject: [PATCH 2/4] agendas: add exception source model (#29209)

 .../sync_desks_timeperiod_exceptions.py       |  8 +-
 .../0033_timeperiodexceptionsource.py         | 34 +++++++
 .../agendas/migrations/0034_initial_source.py | 42 +++++++++
 ...e_desk_timeperiod_exceptions_remote_url.py | 19 ++++
 chrono/agendas/models.py                      | 52 ++++++-----
 chrono/manager/forms.py                       |  4 +-
 .../chrono/manager_import_exceptions.html     | 32 ++++++-
 chrono/manager/views.py                       | 17 ++--
 tests/test_agendas.py                         | 81 +++++++---------
 tests/test_manager.py                         | 92 ++++---------------
 10 files changed, 221 insertions(+), 160 deletions(-)
 create mode 100644 chrono/agendas/migrations/0033_timeperiodexceptionsource.py
 create mode 100644 chrono/agendas/migrations/0034_initial_source.py
 create mode 100644 chrono/agendas/migrations/0035_remove_desk_timeperiod_exceptions_remote_url.py
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py
18 18

  
19 19
import sys
20 20

  
21
from chrono.agendas.models import Desk, ICSError
21
from chrono.agendas.models import TimePeriodExceptionSource, ICSError
22 22
from django.core.management.base import BaseCommand
23 23

  
24 24

  
......
26 26
    help = 'Synchronize time period exceptions from desks remote ics'
27 27

  
28 28
    def handle(self, **options):
29
        for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
29
        for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
30 30
            try:
31
                desk.create_timeperiod_exceptions_from_remote_ics(desk.timeperiod_exceptions_remote_url)
31
                source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
32 32
            except ICSError as e:
33
                print(u'unable to create timeperiod exceptions for "%s": %s' % (desk, e), file=sys.stderr)
33
                print(u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr)
chrono/agendas/migrations/0033_timeperiodexceptionsource.py
1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3

  
4
from django.db import migrations, models
5
import django.db.models.deletion
6

  
7

  
8
class Migration(migrations.Migration):
9

  
10
    dependencies = [
11
        ('agendas', '0032_auto_20191127_0919'),
12
    ]
13

  
14
    operations = [
15
        migrations.CreateModel(
16
            name='TimePeriodExceptionSource',
17
            fields=[
18
                ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
19
                ('ics_filename', models.CharField(max_length=256, null=True)),
20
                ('ics_url', models.URLField(null=True, max_length=500)),
21
                ('desk', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='agendas.Desk')),
22
            ],
23
        ),
24
        migrations.AddField(
25
            model_name='timeperiodexception',
26
            name='source',
27
            field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='agendas.TimePeriodExceptionSource'),
28
        ),
29
        migrations.AlterField(
30
            model_name='desk',
31
            name='timeperiod_exceptions_remote_url',
32
            field=models.URLField(blank=True, max_length=500, null=True, verbose_name='URL to fetch time period exceptions from'),
33
        ),
34
    ]
chrono/agendas/migrations/0034_initial_source.py
1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3

  
4
from django.db import migrations
5

  
6

  
7
def create_source(apps, schema_editor):
8
    Desk = apps.get_model('agendas', 'Desk')
9
    TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
10
    for desk in Desk.objects.exclude(timeperiod_exceptions_remote_url=''):
11
        # create a source for each remote url
12
        source = TimePeriodExceptionSource.objects.create(
13
            desk=desk,
14
            ics_url=desk.timeperiod_exceptions_remote_url)
15
        # clear timeperiod_exceptions_remote_url
16
        desk.timeperiod_exceptions_remote_url = None
17
        desk.save()
18
        # attach exceptions to the created source
19
        desk.timeperiodexception_set.exclude(external_id='').update(source=source)
20

  
21

  
22
def init_remote_url(apps, schema_editor):
23
    TimePeriodExceptionSource = apps.get_model('agendas', 'TimePeriodExceptionSource')
24
    for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
25
        # set timeperiod_exceptions_remote_url
26
        source.desk.timeperiod_exceptions_remote_url = source.ics_url
27
        source.desk.save()
28
        # unlink exceptions
29
        source.timeperiodexception_set.update(source=None)
30
        # delete the source
31
        source.delete()
32

  
33

  
34
class Migration(migrations.Migration):
35

  
36
    dependencies = [
37
        ('agendas', '0033_timeperiodexceptionsource'),
38
    ]
39

  
40
    operations = [
41
        migrations.RunPython(create_source, init_remote_url),
42
    ]
chrono/agendas/migrations/0035_remove_desk_timeperiod_exceptions_remote_url.py
1
# -*- coding: utf-8 -*-
2
# Generated by Django 1.11.18 on 2019-12-09 14:24
3
from __future__ import unicode_literals
4

  
5
from django.db import migrations
6

  
7

  
8
class Migration(migrations.Migration):
9

  
10
    dependencies = [
11
        ('agendas', '0034_initial_source'),
12
    ]
13

  
14
    operations = [
15
        migrations.RemoveField(
16
            model_name='desk',
17
            name='timeperiod_exceptions_remote_url',
18
        ),
19
    ]
chrono/agendas/models.py
436 436
    agenda = models.ForeignKey(Agenda, on_delete=models.CASCADE)
437 437
    label = models.CharField(_('Label'), max_length=150)
438 438
    slug = models.SlugField(_('Identifier'), max_length=160)
439
    timeperiod_exceptions_remote_url = models.URLField(
440
            _('URL to fetch time period exceptions from'),
441
            blank=True, max_length=500)
442 439

  
443 440
    def __str__(self):
444 441
        return self.label
......
493 490
        in_two_weeks = self.get_exceptions_within_two_weeks()
494 491
        return self.timeperiodexception_set.count() == len(in_two_weeks)
495 492

  
496
    def create_timeperiod_exceptions_from_remote_ics(self, url):
493
    def import_timeperiod_exceptions_from_remote_ics(self, ics_url, source=None):
497 494
        try:
498
            response = requests.get(url, proxies=settings.REQUESTS_PROXIES)
495
            response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES)
499 496
            response.raise_for_status()
500 497
        except requests.HTTPError as e:
501 498
            raise ICSError(
502 499
                _('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).') %
503
                {'url': url, 'status_code': e.response.status_code})
500
                {'url': ics_url, 'status_code': e.response.status_code})
504 501
        except requests.RequestException as e:
505 502
            raise ICSError(
506 503
                _('Failed to retrieve remote calendar (%(url)s, %(exception)s).') %
507
                {'url': url, 'exception': e})
504
                {'url': ics_url, 'exception': e})
508 505

  
509
        return self.create_timeperiod_exceptions_from_ics(response.text, keep_synced_by_uid=True)
506
        if source is None:
507
            source = TimePeriodExceptionSource(desk=self, ics_url=ics_url)
508
        return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text, keep_synced_by_uid=True)
510 509

  
511
    def remove_timeperiod_exceptions_from_remote_ics(self):
512
        TimePeriodException.objects.filter(desk=self).exclude(external_id='').delete()
510
    def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None):
511
        if source is None:
512
            source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name)
513
        return self._import_timeperiod_exceptions_from_ics(source=source, data=ics_file.read())
513 514

  
514
    def create_timeperiod_exceptions_from_ics(self, data, keep_synced_by_uid=False, recurring_days=600):
515
    def _import_timeperiod_exceptions_from_ics(self, source, data, keep_synced_by_uid=False, recurring_days=600):
515 516
        try:
516 517
            parsed = vobject.readOne(data)
517 518
        except vobject.base.ParseError:
......
523 524
            raise ICSError(_('The file doesn\'t contain any events.'))
524 525

  
525 526
        with transaction.atomic():
527
            if source.pk is None:
528
                source.save()
529
            # delete old exceptions related to this source
530
            source.timeperiodexception_set.all().delete()
531
            # create new exceptions
526 532
            update_datetime = now()
527 533
            for vevent in parsed.contents.get('vevent', []):
528 534
                if 'summary' in vevent.contents:
......
560 566

  
561 567
                kwargs = {}
562 568
                kwargs['desk'] = self
569
                kwargs['source'] = source
563 570
                kwargs['recurrence_id'] = 0
564 571
                if keep_synced_by_uid:
565 572
                    kwargs['external_id'] = vevent.contents['uid'][0].value
......
593 600
                            obj, created = TimePeriodException.objects.update_or_create(defaults=event, **kwargs)
594 601
                            if created:
595 602
                                total_created += 1
596
                    # delete unseen occurrences
597
                    kwargs.pop('recurrence_id', None)
598
                    TimePeriodException.objects.filter(recurrence_id__gt=i, **kwargs).delete()
599

  
600
            if keep_synced_by_uid:
601
                # delete all outdated exceptions from remote calendar
602
                TimePeriodException.objects.filter(
603
                    update_datetime__lt=update_datetime,
604
                    desk=self
605
                ).exclude(external_id='').delete()
606 603

  
607 604
        return total_created
608 605

  
......
623 620
        return openslots.search(aware_date, aware_next_date)
624 621

  
625 622

  
623
@python_2_unicode_compatible
624
class TimePeriodExceptionSource(models.Model):
625
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
626
    ics_filename = models.CharField(null=True, max_length=256)
627
    ics_url = models.URLField(null=True, max_length=500)
628

  
629
    def __str__(self):
630
        if self.ics_filename is not None:
631
            return self.ics_filename
632
        return self.ics_url
633

  
634

  
626 635
@python_2_unicode_compatible
627 636
class TimePeriodException(models.Model):
628 637
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
638
    source = models.ForeignKey(TimePeriodExceptionSource, on_delete=models.CASCADE, null=True)
629 639
    external_id = models.CharField(_('External ID'), max_length=256, blank=True)
630 640
    label = models.CharField(_('Optional Label'), max_length=150, blank=True, null=True)
631 641
    start_datetime = models.DateTimeField(_('Exception start time'))
chrono/manager/forms.py
137 137
        widgets = {
138 138
            'agenda': forms.HiddenInput(),
139 139
        }
140
        exclude = ['slug', 'timeperiod_exceptions_remote_url']
140
        exclude = ['slug']
141 141

  
142 142

  
143 143
class DeskForm(forms.ModelForm):
......
146 146
        widgets = {
147 147
            'agenda': forms.HiddenInput(),
148 148
        }
149
        exclude = ['timeperiod_exceptions_remote_url']
149
        exclude = []
150 150

  
151 151

  
152 152
class TimePeriodExceptionForm(forms.ModelForm):
chrono/manager/templates/chrono/manager_import_exceptions.html
13 13
{% block content %}
14 14

  
15 15
<form method="post" enctype="multipart/form-data">
16
  <p class="notice">{% trans "You can upload a file or specify an address to a remote calendar." %}</p>
16
  {% if exception_sources %}
17
  <table class="main">
18
    <thead>
19
      <tr>
20
        <th>{% trans "Exceptions" %}</th>
21
        <th></th>
22
        <th></th>
23
      </tr>
24
    </thead>
25
    <tbody>
26
      {% for object in exception_sources %}
27
      <tr>
28
        <td>
29
          <span title="{{ object }}">
30
            {% if object.ics_filename %}{{ object|truncatechars:50 }}{% else %}<a href="{{ object }}">{{ object|truncatechars:50 }}</a>{% endif %}
31
          </span>
32
        </td>
33
        <td>
34
          <a rel="popup" href="">
35
            {% if object.ics_filename %}{% trans "replace" %}{% else %}{% trans "refresh" %}{% endif %}
36
          </a>
37
        </td>
38
        <td><a rel="popup" href="">{% trans "remove" %}</a></td>
39
      </tr>
40
      {% endfor %}
41
    </tbody>
42
   </table>
43
   <br />
44
  {% endif %}
45

  
46
  <p class="notice">{% trans "To add new exceptions, you can upload a file or specify an address to a remote calendar." %}</p>
17 47
  {% csrf_token %}
18 48
  {{ form.as_p }}
19 49
  <p>
chrono/manager/views.py
802 802
    form_class = ExceptionsImportForm
803 803
    template_name = 'chrono/manager_import_exceptions.html'
804 804

  
805
    def get_initial(self):
806
        return {'ics_url': self.get_object().timeperiod_exceptions_remote_url}
805
    def get_context_data(self, **kwargs):
806
        context = super(DeskImportTimePeriodExceptionsView, self).get_context_data(**kwargs)
807
        context['exception_sources'] = self.get_object().timeperiodexceptionsource_set.all()
808
        return context
807 809

  
808 810
    def form_valid(self, form):
809 811
        exceptions = None
810 812
        try:
811 813
            if form.cleaned_data['ics_file']:
812
                ics_file_content = force_text(form.cleaned_data['ics_file'].read())
813
                exceptions = form.instance.create_timeperiod_exceptions_from_ics(ics_file_content)
814
                exceptions = form.instance.import_timeperiod_exceptions_from_ics_file(form.cleaned_data['ics_file'])
814 815
            elif form.cleaned_data['ics_url']:
815
                exceptions = form.instance.create_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
816
                exceptions = form.instance.import_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url'])
816 817
            else:
817
                form.instance.remove_timeperiod_exceptions_from_remote_ics()
818
                form.add_error(None, _('Please provide an ICS File or an URL.'))
819
                return self.form_invalid(form)
818 820
        except ICSError as e:
819 821
            form.add_error(None, force_text(e))
820 822
            return self.form_invalid(form)
821
        form.instance.timeperiod_exceptions_remote_url = form.cleaned_data['ics_url']
822
        form.instance.save()
823

  
823 824
        if exceptions is not None:
824 825
            message = ungettext('An exception has been imported.',
825 826
                                '%(count)d exceptions have been imported.', exceptions)
tests/test_agendas.py
7 7

  
8 8
from django.utils.timezone import now, make_aware, localtime
9 9
from django.contrib.auth.models import Group
10
from django.core.files.base import ContentFile
10 11
from django.core.management import call_command
11 12

  
12 13
from chrono.agendas.models import (
13 14
    Agenda, Event, Booking, MeetingType,
14
    Desk, TimePeriodException, ICSError)
15
    Desk, TimePeriodException, TimePeriodExceptionSource, ICSError)
15 16

  
16 17
pytestmark = pytest.mark.django_db
17 18

  
......
75 76
END:VEVENT
76 77
END:VCALENDAR"""
77 78

  
78
ICS_SAMPLE_WITH_RECURRENT_EVENT_2 = """BEGIN:VCALENDAR
79
VERSION:2.0
80
PRODID:-//foo.bar//EN
81
BEGIN:VEVENT
82
DTSTAMP:20170720T145803Z
83
DESCRIPTION:Vacances d'ete
84
DTSTART;VALUE=DATE:20180101
85
DTEND;VALUE=DATE:20180101
86
SUMMARY:reccurent event
87
END:VEVENT
88
BEGIN:VEVENT
89
DTSTAMP:20170824T082855Z
90
DTSTART:20180102
91
DTEND:20180101
92
SUMMARY:New Year's Eve
93
RRULE:FREQ=YEARLY;COUNT=1
94
END:VEVENT
95
END:VCALENDAR"""
96

  
97 79
ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST = """BEGIN:VCALENDAR
98 80
VERSION:2.0
99 81
PRODID:-//foo.bar//EN
......
114 96
INVALID_ICS_SAMPLE = """content
115 97
"""
116 98

  
99

  
117 100
with open('tests/data/atreal.ics') as f:
118 101
    ICS_ATREAL = f.read()
119 102

  
......
205 188
    agenda.save()
206 189
    desk = Desk(label='Test 1 desk', agenda=agenda)
207 190
    desk.save()
208
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE)
191
    exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
192
        ContentFile(ICS_SAMPLE, name='sample.ics'))
209 193
    assert exceptions_count == 2
210 194
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
211 195

  
......
221 205
        if line.startswith('DTSTART:'):
222 206
            continue
223 207
        lines.append(line)
224
    ics_sample = "\n".join(lines)
208
    ics_sample = ContentFile("\n".join(lines), name='sample.ics')
225 209
    with pytest.raises(ICSError) as e:
226
        desk.create_timeperiod_exceptions_from_ics(ics_sample)
210
        desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
227 211
    assert 'Event "Event 1" has no start date.' == str(e.value)
228 212

  
229 213

  
......
238 222
        if line.startswith('DTEND:'):
239 223
            continue
240 224
        lines.append(line)
241
    ics_sample = "\n".join(lines)
242
    desk.create_timeperiod_exceptions_from_ics(ics_sample)
225
    ics_sample = ContentFile("\n".join(lines), name='sample.ics')
226
    desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
243 227
    for exception in TimePeriodException.objects.filter(desk=desk):
244 228
        end_time = localtime(exception.end_datetime).time()
245 229
        assert end_time == datetime.time(23, 59, 59, 999999)
......
251 235
    agenda.save()
252 236
    desk = Desk(label='Test 4 desk', agenda=agenda)
253 237
    desk.save()
254
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 3
238
    assert desk.import_timeperiod_exceptions_from_ics_file(
239
        ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics')) == 3
255 240
    assert TimePeriodException.objects.filter(desk=desk).count() == 3
256
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set([
257
        make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))])
258
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT) == 0
259
    # verify occurences are cleaned when count changed
260
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_2) == 0
261
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
262
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set([
263
        make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2018, 1, 2))])
264 241

  
265 242

  
266 243
def test_timeexception_creation_from_ics_with_dates():
......
274 251
        if line.startswith('RRULE:'):
275 252
            continue
276 253
        lines.append(line)
277
    ics_sample = "\n".join(lines)
278
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ics_sample)
254
    ics_sample = ContentFile("\n".join(lines), name='sample.ics')
255
    exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
279 256
    assert exceptions_count == 2
280 257
    for exception in TimePeriodException.objects.filter(desk=desk):
281 258
        assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0))
......
288 265
    desk = Desk(label='Test 6 desk', agenda=agenda)
289 266
    desk.save()
290 267
    with pytest.raises(ICSError) as e:
291
        desk.create_timeperiod_exceptions_from_ics(INVALID_ICS_SAMPLE)
268
        desk.import_timeperiod_exceptions_from_ics_file(
269
            ContentFile(INVALID_ICS_SAMPLE, name='sample.ics'))
292 270
    assert str(e.value) == 'File format is invalid.'
293 271

  
294 272

  
......
298 276
    desk = Desk(label='Test 7 desk', agenda=agenda)
299 277
    desk.save()
300 278
    with pytest.raises(ICSError) as e:
301
        desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_NO_EVENTS)
279
        desk.import_timeperiod_exceptions_from_ics_file(
280
            ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics'))
302 281
    assert str(e.value) == "The file doesn't contain any events."
303 282

  
304 283

  
......
311 290
    mocked_response = mock.Mock()
312 291
    mocked_response.text = ICS_SAMPLE
313 292
    mocked_get.return_value = mocked_response
314
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
293
    exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
315 294
    assert exceptions_count == 2
316 295
    mocked_response.text = re.sub(r'SUMMARY:\w+', 'SUMMARY:New summmary', ICS_SAMPLE, re.MULTILINE)
317 296
    mocked_get.return_value = mocked_response
318
    desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
297
    desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
319 298
    for timeperiod in TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id, desk=desk):
320 299
        assert 'New summary ' in timeperiod.label
321 300

  
322 301
    mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS
323 302
    mocked_get.return_value = mocked_response
324
    exceptions_count = desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
303
    exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
325 304
    assert exceptions_count == 0
326 305
    TimePeriodException.objects.filter(external_id='desk-%s:' % desk.id).count() == 0
327 306

  
......
341 320

  
342 321
    mocked_get.side_effect = mocked_requests_connection_error
343 322
    with pytest.raises(ICSError) as e:
344
        desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
323
        desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
345 324
    assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)."
346 325

  
347 326

  
......
361 340
    mocked_get.side_effect = mocked_requests_http_forbidden_error
362 341

  
363 342
    with pytest.raises(ICSError) as e:
364
        desk.create_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
343
        desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
365 344
    assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)."
366 345

  
367 346

  
......
369 348
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys):
370 349
    agenda = Agenda(label=u'Test 11 agenda')
371 350
    agenda.save()
372
    desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http://example.com/sample.ics')
351
    desk = Desk(label='Test 11 desk', agenda=agenda)
373 352
    desk.save()
353
    TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
374 354
    mocked_response = mock.Mock()
375 355
    mocked_response.status_code = 403
376 356
    mocked_get.return_value = mocked_response
......
390 370
def test_sync_desks_timeperiod_exceptions_from_changing_ics(mocked_get, caplog):
391 371
    agenda = Agenda(label=u'Test 11 agenda')
392 372
    agenda.save()
393
    desk = Desk(label='Test 11 desk', agenda=agenda, timeperiod_exceptions_remote_url='http:example.com/sample.ics')
373
    desk = Desk(label='Test 11 desk', agenda=agenda)
394 374
    desk.save()
375
    TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
395 376
    mocked_response = mock.Mock()
396 377
    mocked_response.text = ICS_SAMPLE
397 378
    mocked_get.return_value = mocked_response
......
446 427
    agenda.save()
447 428
    desk = Desk(label='Test 1 desk', agenda=agenda)
448 429
    desk.save()
449
    exceptions_count = desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_DURATION)
430
    exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
431
            ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics'))
450 432
    assert exceptions_count == 2
451 433
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
452 434
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set([
......
467 449
    agenda.save()
468 450
    desk = Desk(label='Test 4 desk', agenda=agenda)
469 451
    desk.save()
470
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 2
452
    assert desk.import_timeperiod_exceptions_from_ics_file(
453
            ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics')) == 2
471 454
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
472 455
    assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set([
473 456
        make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))])
474
    assert desk.create_timeperiod_exceptions_from_ics(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST) == 0
475 457

  
476 458

  
477 459
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal():
......
479 461
    agenda.save()
480 462
    desk = Desk(label='Test atreal desk', agenda=agenda)
481 463
    desk.save()
482
    assert desk.create_timeperiod_exceptions_from_ics(ICS_ATREAL)
464
    assert desk.import_timeperiod_exceptions_from_ics_file(
465
            ContentFile(ICS_ATREAL, name='sample.ics'))
483 466

  
484 467

  
485 468
def test_management_role_deletion():
tests/test_manager.py
15 15
from webtest import Upload
16 16

  
17 17
from chrono.agendas.models import (Agenda, Event, Booking, MeetingType,
18
                                   TimePeriod, Desk, TimePeriodException)
18
                                   TimePeriod, Desk, TimePeriodException,
19
                                   TimePeriodExceptionSource)
19 20

  
20 21
pytestmark = pytest.mark.django_db
21 22

  
......
1142 1143
    resp = resp.click('Settings')
1143 1144
    assert 'Import exceptions from .ics' in resp.text
1144 1145
    resp = resp.click('upload')
1145
    assert "You can upload a file or specify an address to a remote calendar." in resp
1146
    resp = resp.form.submit(status=302)
1146
    assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp
1147
    resp = resp.form.submit(status=200)
1148
    assert 'Please provide an ICS File or an URL.' in resp.text
1147 1149
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1148 1150
    resp = resp.click('Settings')
1149 1151
    resp = resp.click('upload')
......
1184 1186
    resp.form['ics_file'] = Upload('exceptions.ics', ics_with_exceptions, 'text/calendar')
1185 1187
    resp = resp.form.submit(status=302)
1186 1188
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
1189
    assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
1190
    source = TimePeriodExceptionSource.objects.latest('pk')
1191
    exception = TimePeriodException.objects.latest('pk')
1192
    assert exception.source == source
1193
    assert source.ics_filename == 'exceptions.ics'
1194
    assert source.ics_url is None
1187 1195
    resp = resp.follow()
1188 1196
    assert 'An exception has been imported.' in resp.text
1189 1197

  
......
1252 1260
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
1253 1261
    exception = TimePeriodException.objects.get(desk=desk)
1254 1262
    assert exception.external_id == 'random-event-id'
1255
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1256
    resp = resp.click('Settings')
1257
    resp = resp.click('upload')
1258
    resp.form['ics_url'] = ''
1259
    resp = resp.form.submit(status=302)
1260
    assert not TimePeriodException.objects.filter(
1261
        desk=desk,
1262
        external_id='desk-%s:random-event-id' % desk.id).exists()
1263
    assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 1
1264
    source = TimePeriodExceptionSource.objects.latest('pk')
1265
    exception = TimePeriodException.objects.latest('pk')
1266
    assert exception.source == source
1267
    assert source.ics_filename is None
1268
    assert source.ics_url == 'http://example.com/foo.ics'
1263 1269

  
1264 1270

  
1265 1271
@mock.patch('chrono.agendas.models.requests.get')
......
1300 1306
VERSION:2.0
1301 1307
PRODID:-//foo.bar//EN
1302 1308
END:VCALENDAR"""
1303
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1304
    resp = resp.click('Settings')
1305
    resp = resp.click('upload')
1306
    resp = resp.form.submit(status=302)
1307
    assert not TimePeriodException.objects.filter(
1308
        desk=desk,
1309
        external_id='random-event-id').exists()
1310

  
1311

  
1312
@mock.patch('chrono.agendas.models.requests.get')
1313
def test_agenda_update_time_period_exception_from_remote_ics(mocked_get, app, admin_user):
1314
    agenda = Agenda.objects.create(label='New Example', kind='meetings')
1315
    desk = Desk.objects.create(agenda=agenda, label='New Desk')
1316
    MeetingType(agenda=agenda, label='Bar').save()
1317
    login(app)
1318
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1319
    resp = resp.click('Settings')
1320
    assert 'Import exceptions from .ics' not in resp.text
1321

  
1322
    TimePeriod.objects.create(
1323
        weekday=1, desk=desk,
1324
        start_time=datetime.time(10, 0), end_time=datetime.time(12, 0))
1325

  
1326
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1327
    resp = resp.click('Settings')
1328
    resp = resp.click('upload')
1329
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1330
    mocked_response = mock.Mock()
1331
    mocked_response.text = """BEGIN:VCALENDAR
1332
VERSION:2.0
1333
PRODID:-//foo.bar//EN
1334
BEGIN:VEVENT
1335
UID:first-eventrandom-event-id
1336
DTSTART:20180101
1337
DTEND:20180101
1338
SUMMARY:First test event
1339
END:VEVENT
1340
BEGIN:VEVENT
1341
UID:second-eventrandom-event-id
1342
DTSTART:20190101
1343
DTEND:20190101
1344
SUMMARY:Second test event
1345
END:VEVENT
1346
END:VCALENDAR"""
1347
    mocked_get.return_value = mocked_response
1348
    resp = resp.form.submit(status=302)
1349
    assert TimePeriodException.objects.filter(desk=desk).count() == 2
1350
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1351
    resp = resp.click('Settings')
1352
    resp = resp.click('upload')
1353
    resp.form['ics_url'] = 'http://example.com/foo.ics'
1354
    mocked_response.text = """BEGIN:VCALENDAR
1355
VERSION:2.0
1356
PRODID:-//foo.bar//EN
1357
BEGIN:VEVENT
1358
UID:secord-eventrandom-event-id
1359
DTSTART:20190101
1360
DTEND:20190101
1361
SUMMARY:Second test event
1362
END:VEVENT
1363
END:VCALENDAR"""
1364
    mocked_get.return_value = mocked_response
1365
    resp = resp.form.submit(status=302)
1366
    assert TimePeriodException.objects.filter(desk=desk).count() == 1
1367 1309

  
1368 1310

  
1369 1311
@mock.patch('chrono.agendas.models.requests.get')
1370
-