Projet

Général

Profil

0001-agendas-keep-the-file-of-an-exception-source-39259.patch

Lauréline Guérin, 28 janvier 2020 15:10

Télécharger (13,7 ko)

Voir les différences:

Subject: [PATCH] agendas: keep the file of an exception source (#39259)

 .../sync_desks_timeperiod_exceptions.py       | 10 ++++-
 ...0037_timeperiodexceptionsource_ics_file.py | 20 +++++++++
 chrono/agendas/models.py                      | 10 ++++-
 chrono/manager/forms.py                       | 12 +++++-
 .../chrono/manager_replace_exceptions.html    | 14 ++-----
 chrono/manager/views.py                       | 10 ++++-
 tests/test_agendas.py                         | 41 ++++++++++++++++++-
 tests/test_manager.py                         |  9 +++-
 8 files changed, 107 insertions(+), 19 deletions(-)
 create mode 100644 chrono/agendas/migrations/0037_timeperiodexceptionsource_ics_file.py
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py
27 27
    help = 'Synchronize time period exceptions from desks remote ics'
28 28

  
29 29
    def handle(self, **options):
30
        for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False):
30
        for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=False, ics_file=''):
31 31
            try:
32 32
                source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source)
33 33
            except ICSError as e:
34 34
                print(
35 35
                    u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr
36 36
                )
37

  
38
        for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=True).exclude(ics_file=''):
39
            try:
40
                source.desk.import_timeperiod_exceptions_from_ics_file(source.ics_file, source=source)
41
            except ICSError as e:
42
                print(
43
                    u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr
44
                )
chrono/agendas/migrations/0037_timeperiodexceptionsource_ics_file.py
1
# -*- coding: utf-8 -*-
2
from __future__ import unicode_literals
3

  
4
import chrono.agendas.models
5
from django.db import migrations, models
6

  
7

  
8
class Migration(migrations.Migration):
9

  
10
    dependencies = [
11
        ('agendas', '0036_auto_20191223_1758'),
12
    ]
13

  
14
    operations = [
15
        migrations.AddField(
16
            model_name='timeperiodexceptionsource',
17
            name='ics_file',
18
            field=models.FileField(blank=True, null=True, upload_to=chrono.agendas.models.ics_directory_path),
19
        ),
20
    ]
chrono/agendas/models.py
19 19
import math
20 20
import requests
21 21
import vobject
22
import uuid
22 23

  
23 24
import django
24 25
from django.conf import settings
......
583 584
            # often be missing and defaults to iso-8859-15.
584 585
            response.content.decode('utf-8')
585 586
            response.encoding = 'utf-8'
586
        except UnicodeDecodeError as e:
587
        except UnicodeDecodeError:
587 588
            pass
588 589
        return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text)
589 590

  
590 591
    def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None):
591 592
        if source is None:
592
            source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name)
593
            source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name, ics_file=ics_file)
593 594
        return self._import_timeperiod_exceptions_from_ics(source=source, data=force_text(ics_file.read()))
594 595

  
595 596
    def _import_timeperiod_exceptions_from_ics(self, source, data, recurring_days=600):
......
691 692
        return openslots.search(aware_date, aware_next_date)
692 693

  
693 694

  
695
def ics_directory_path(instance, filename):
696
    return 'ics/{0}/{1}'.format(str(uuid.uuid4()), filename)
697

  
698

  
694 699
class TimePeriodExceptionSource(models.Model):
695 700
    desk = models.ForeignKey(Desk, on_delete=models.CASCADE)
696 701
    ics_filename = models.CharField(null=True, max_length=256)
702
    ics_file = models.FileField(upload_to=ics_directory_path, blank=True, null=True)
697 703
    ics_url = models.URLField(null=True, max_length=500)
698 704

  
699 705
    def __str__(self):
chrono/manager/forms.py
263 263
    ics_file = forms.FileField(
264 264
        label=_('ICS File'),
265 265
        required=False,
266
        help_text=_('ICS file containing events which will be considered as exceptions.'),
266
        help_text=_(
267
            'ICS file containing events which will be considered as exceptions. Will be synchronised hourly'
268
        ),
267 269
    )
268 270
    ics_url = forms.URLField(
269 271
        label=_('URL'),
......
282 284

  
283 285

  
284 286
class TimePeriodExceptionSourceReplaceForm(forms.ModelForm):
285
    ics_file = forms.FileField(
287
    ics_newfile = forms.FileField(
286 288
        label=_('ICS File'),
287 289
        required=False,
288 290
        help_text=_('ICS file containing events which will be considered as exceptions.'),
......
292 294
        model = TimePeriodExceptionSource
293 295
        fields = []
294 296

  
297
    def save(self, *args, **kwargs):
298
        if bool(self.instance.ics_file):
299
            self.instance.ics_file.delete()
300
        self.instance.ics_file = self.cleaned_data['ics_newfile']
301
        self.instance.save()
302

  
295 303

  
296 304
class AgendasImportForm(forms.Form):
297 305
    agendas_json = forms.FileField(label=_('Agendas Export File'))
chrono/manager/templates/chrono/manager_replace_exceptions.html
2 2
{% load i18n %}
3 3

  
4 4
{% block appbar %}
5
<h2>{% if form.instance.ics_filename %}{% trans "Replace exceptions" %}{% else %}{% trans "Refresh exceptions" %}{% endif %}</h2>
5
<h2>{% trans "Replace exceptions" %}</h2>
6 6
{% endblock %}
7 7

  
8 8
{% block content %}
9 9
<form method="post" enctype="multipart/form-data">
10
  {% if form.instance.ics_filename %}
11
    <p class="notice">{% trans "To replace existing exceptions, please upload a new file." %}</p>
12
  {% else %}
13
  <p class="notice">
14
    {% trans 'Press the button "Refresh" to refresh existing exceptions from:' %}
15
    <br />
16
    <a href="{{ form.instance.ics_url }}">{{ form.instance.ics_url }}</a>
17
  </p>
18
  {% endif %}
10
  <p class="notice">{% trans "To replace existing exceptions, please upload a new file." %}</p>
19 11
  {% csrf_token %}
20 12
  {{ form.as_p }}
21 13
  <p>
22 14
  </p>
23 15
  <div class="buttons">
24
    <button>{% if form.instance.ics_filename %}{% trans "Replace" %}{% else %}{% trans "Refresh" %}{% endif %}</button>
16
    <button>{% trans "Replace" %}</button>
25 17
    <a class="cancel" href="{% url 'chrono-manager-agenda-settings' pk=agenda.id %}">{% trans 'Cancel' %}</a>
26 18
  </div>
27 19
</form>
chrono/manager/views.py
985 985
    form_class = TimePeriodExceptionSourceReplaceForm
986 986
    template_name = 'chrono/manager_replace_exceptions.html'
987 987

  
988
    def get_queryset(self):
989
        queryset = super(TimePeriodExceptionSourceReplaceView, self).get_queryset()
990
        return queryset.filter(ics_filename__isnull=False)
991

  
988 992
    def form_valid(self, form):
989 993
        exceptions = None
990 994
        try:
991 995
            exceptions = form.instance.desk.import_timeperiod_exceptions_from_ics_file(
992
                form.cleaned_data['ics_file'], source=form.instance
996
                form.cleaned_data['ics_newfile'], source=form.instance
993 997
            )
994 998
        except ICSError as e:
995 999
            form.add_error(None, force_text(e))
......
1010 1014
class TimePeriodExceptionSourceRefreshView(ManagedDeskSubobjectMixin, DetailView):
1011 1015
    model = TimePeriodExceptionSource
1012 1016

  
1017
    def get_queryset(self):
1018
        queryset = super(TimePeriodExceptionSourceRefreshView, self).get_queryset()
1019
        return queryset.filter(ics_url__isnull=False)
1020

  
1013 1021
    def get(self, request, *args, **kwargs):
1014 1022
        try:
1015 1023
            source = self.get_object()
tests/test_agendas.py
370 370
    agenda.save()
371 371
    desk = Desk(label='Test 11 desk', agenda=agenda)
372 372
    desk.save()
373
    TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
373
    source = TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics')
374 374
    mocked_response = mock.Mock()
375 375
    mocked_response.status_code = 403
376 376
    mocked_get.return_value = mocked_response
......
386 386
        == 'unable to create timeperiod exceptions for "Test 11 desk": Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403).\n'
387 387
    )
388 388

  
389
    assert source.ics_url is not None
390
    assert source.ics_filename is None
391
    assert source.ics_file.name is None
392
    with mock.patch(
393
        'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
394
    ) as import_remote_ics:
395
        with mock.patch(
396
            'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
397
        ) as import_file_ics:
398
            call_command('sync_desks_timeperiod_exceptions')
399
    assert import_remote_ics.call_args_list == [mock.call('http://example.com/sample.ics', source=source)]
400
    assert import_file_ics.call_args_list == []
401

  
402
    source.ics_url = None
403
    source.ics_filename = 'sample.ics'
404
    source.ics_file = ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics')
405
    source.save()
406
    with mock.patch(
407
        'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
408
    ) as import_remote_ics:
409
        with mock.patch(
410
            'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
411
        ) as import_file_ics:
412
            call_command('sync_desks_timeperiod_exceptions')
413
    assert import_remote_ics.call_args_list == []
414
    assert import_file_ics.call_args_list == [mock.call(mock.ANY, source=source)]
415

  
416
    source.ics_file.delete()
417
    source.save()
418
    with mock.patch(
419
        'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics'
420
    ) as import_remote_ics:
421
        with mock.patch(
422
            'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file'
423
        ) as import_file_ics:
424
            call_command('sync_desks_timeperiod_exceptions')
425
    assert import_remote_ics.call_args_list == []
426
    assert import_file_ics.call_args_list == []
427

  
389 428

  
390 429
def test_base_meeting_duration():
391 430
    agenda = Agenda(label='Meeting', kind='meetings')
tests/test_manager.py
3 3
from __future__ import unicode_literals
4 4
import copy
5 5
import json
6
import os
6 7

  
7 8
from django.contrib.auth.models import User, Group
8 9
from django.utils.encoding import force_text
......
1266 1267
    exception = TimePeriodException.objects.latest('pk')
1267 1268
    assert exception.source == source
1268 1269
    assert source.ics_filename == 'exceptions.ics'
1270
    assert 'exceptions.ics' in source.ics_file.name
1269 1271
    assert source.ics_url is None
1270 1272
    resp = resp.follow()
1271 1273
    assert 'An exception has been imported.' in resp.text
......
1338 1340
    exception = TimePeriodException.objects.latest('pk')
1339 1341
    assert exception.source == source
1340 1342
    assert source.ics_filename is None
1343
    assert source.ics_file.name == ''
1341 1344
    assert source.ics_url == 'http://example.com/foo.ics'
1342 1345

  
1343 1346

  
......
1555 1558
    source = TimePeriodExceptionSource.objects.latest('pk')
1556 1559
    assert source.timeperiodexception_set.count() == 2
1557 1560
    exceptions = list(source.timeperiodexception_set.order_by('pk'))
1561
    old_ics_file_path = source.ics_file.path
1558 1562

  
1559 1563
    # replace the source
1560 1564
    resp = app.get('/manage/agendas/%d/' % agenda.pk).follow()
1561 1565
    resp = resp.click('Settings')
1562 1566
    resp = resp.click('upload')
1563 1567
    resp = resp.click(href='/manage/time-period-exceptions-source/%d/replace' % source.pk)
1564
    resp.form['ics_file'] = Upload('exceptions.ics', ics_file_content, 'text/calendar')
1568
    resp.form['ics_newfile'] = Upload('exceptions.ics', ics_file_content, 'text/calendar')
1565 1569
    resp = resp.form.submit().follow()
1570
    source.refresh_from_db()
1571
    assert source.ics_file.path != old_ics_file_path
1572
    assert os.path.exists(old_ics_file_path) is False
1566 1573
    assert TimePeriodException.objects.count() == 2
1567 1574
    assert source.timeperiodexception_set.count() == 2
1568 1575
    new_exceptions = list(source.timeperiodexception_set.order_by('pk'))
1569
-