0002-agendas-move-exception-refresh-logic-50723.patch
chrono/agendas/management/commands/sync_desks_timeperiod_exceptions.py | ||
---|---|---|
28 | 28 |
help = 'Synchronize time period exceptions from desks remote ics' |
29 | 29 | |
30 | 30 |
def handle(self, **options): |
31 |
for source in TimePeriodExceptionSource.objects.filter(
|
|
31 |
qs_url = TimePeriodExceptionSource.objects.filter(
|
|
32 | 32 |
Q(ics_file='') | Q(ics_file__isnull=True), ics_url__isnull=False |
33 |
): |
|
34 |
try: |
|
35 |
source.desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source) |
|
36 |
except ICSError as e: |
|
37 |
print( |
|
38 |
u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr |
|
39 |
) |
|
40 | ||
41 |
for source in TimePeriodExceptionSource.objects.filter(ics_url__isnull=True).exclude( |
|
33 |
) |
|
34 |
qs_file = TimePeriodExceptionSource.objects.filter(ics_url__isnull=True).exclude( |
|
42 | 35 |
Q(ics_file='') | Q(ics_file__isnull=True) |
43 |
): |
|
36 |
) |
|
37 |
for source in qs_url.union(qs_file): |
|
44 | 38 |
try: |
45 |
source.desk.import_timeperiod_exceptions_from_ics_file(source.ics_file, source=source)
|
|
39 |
source.refresh_timeperiod_exceptions_from_ics()
|
|
46 | 40 |
except ICSError as e: |
47 | 41 |
print( |
48 | 42 |
u'unable to create timeperiod exceptions for "%s": %s' % (source.desk, e), file=sys.stderr |
chrono/agendas/models.py | ||
---|---|---|
1415 | 1415 |
in_two_weeks = self.get_exceptions_within_two_weeks() |
1416 | 1416 |
return len(self.prefetched_exceptions) == len(in_two_weeks) |
1417 | 1417 | |
1418 |
def import_timeperiod_exceptions_from_remote_ics(self, ics_url, source=None): |
|
1419 |
try: |
|
1420 |
response = requests.get(ics_url, proxies=settings.REQUESTS_PROXIES) |
|
1421 |
response.raise_for_status() |
|
1422 |
except requests.HTTPError as e: |
|
1423 |
raise ICSError( |
|
1424 |
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).') |
|
1425 |
% {'url': ics_url, 'status_code': e.response.status_code} |
|
1426 |
) |
|
1427 |
except requests.RequestException as e: |
|
1428 |
raise ICSError( |
|
1429 |
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).') |
|
1430 |
% {'url': ics_url, 'exception': e} |
|
1431 |
) |
|
1432 | ||
1433 |
if source is None: |
|
1434 |
source = TimePeriodExceptionSource(desk=self, ics_url=ics_url) |
|
1435 |
try: |
|
1436 |
# override response encoding received in HTTP headers as it may |
|
1437 |
# often be missing and defaults to iso-8859-15. |
|
1438 |
response.content.decode('utf-8') |
|
1439 |
response.encoding = 'utf-8' |
|
1440 |
except UnicodeDecodeError: |
|
1441 |
pass |
|
1442 |
return self._import_timeperiod_exceptions_from_ics(source=source, data=response.text) |
|
1443 | ||
1444 |
def import_timeperiod_exceptions_from_ics_file(self, ics_file, source=None): |
|
1445 |
if source is None: |
|
1446 |
source = TimePeriodExceptionSource(desk=self, ics_filename=ics_file.name, ics_file=ics_file) |
|
1447 |
return self._import_timeperiod_exceptions_from_ics(source=source, data=force_text(ics_file.read())) |
|
1448 | ||
1449 |
def _import_timeperiod_exceptions_from_ics(self, source, data, recurring_days=600): |
|
1450 |
try: |
|
1451 |
parsed = vobject.readOne(data) |
|
1452 |
except vobject.base.ParseError: |
|
1453 |
raise ICSError(_('File format is invalid.')) |
|
1454 | ||
1455 |
total_created = 0 |
|
1456 | ||
1457 |
if not parsed.contents.get('vevent'): |
|
1458 |
raise ICSError(_('The file doesn\'t contain any events.')) |
|
1459 | ||
1460 |
with transaction.atomic(): |
|
1461 |
if source.pk is None: |
|
1462 |
source.save() |
|
1463 |
# delete old exceptions related to this source |
|
1464 |
source.timeperiodexception_set.all().delete() |
|
1465 |
# create new exceptions |
|
1466 |
update_datetime = now() |
|
1467 |
for vevent in parsed.contents.get('vevent', []): |
|
1468 |
if 'summary' in vevent.contents: |
|
1469 |
summary = force_text(vevent.contents['summary'][0].value) |
|
1470 |
else: |
|
1471 |
summary = _('Exception') |
|
1472 |
try: |
|
1473 |
start_dt = vevent.dtstart.value |
|
1474 |
if not isinstance(start_dt, datetime.datetime): |
|
1475 |
start_dt = datetime.datetime.combine(start_dt, datetime.datetime.min.time()) |
|
1476 |
if not is_aware(start_dt): |
|
1477 |
start_dt = make_aware(start_dt) |
|
1478 |
except AttributeError: |
|
1479 |
raise ICSError(_('Event "%s" has no start date.') % summary) |
|
1480 |
try: |
|
1481 |
end_dt = vevent.dtend.value |
|
1482 |
if not isinstance(end_dt, datetime.datetime): |
|
1483 |
end_dt = datetime.datetime.combine(end_dt, datetime.datetime.min.time()) |
|
1484 |
if not is_aware(end_dt): |
|
1485 |
end_dt = make_aware(end_dt) |
|
1486 |
duration = end_dt - start_dt |
|
1487 |
except AttributeError: |
|
1488 |
try: |
|
1489 |
duration = vevent.duration.value |
|
1490 |
end_dt = start_dt + duration |
|
1491 |
except AttributeError: |
|
1492 |
# events without end date are considered as ending the same day |
|
1493 |
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time())) |
|
1494 |
duration = end_dt - start_dt |
|
1495 | ||
1496 |
event = { |
|
1497 |
'start_datetime': start_dt, |
|
1498 |
'end_datetime': end_dt, |
|
1499 |
'label': summary, |
|
1500 |
'desk': self, |
|
1501 |
'source': source, |
|
1502 |
'recurrence_id': 0, |
|
1503 |
} |
|
1504 | ||
1505 |
if not vevent.rruleset: |
|
1506 |
# classical event |
|
1507 |
TimePeriodException.objects.create(**event) |
|
1508 |
total_created += 1 |
|
1509 |
elif vevent.rruleset.count(): |
|
1510 |
# recurring event until recurring_days in the future |
|
1511 |
from_dt = start_dt |
|
1512 |
until_dt = update_datetime + datetime.timedelta(days=recurring_days) |
|
1513 |
if not is_aware(vevent.rruleset[0]): |
|
1514 |
from_dt = make_naive(from_dt) |
|
1515 |
until_dt = make_naive(until_dt) |
|
1516 |
i = -1 |
|
1517 |
for i, start_dt in enumerate(vevent.rruleset.between(from_dt, until_dt, inc=True)): |
|
1518 |
# recompute start_dt and end_dt from occurrences and duration |
|
1519 |
if not is_aware(start_dt): |
|
1520 |
start_dt = make_aware(start_dt) |
|
1521 |
end_dt = start_dt + duration |
|
1522 |
event['recurrence_id'] = i |
|
1523 |
event['start_datetime'] = start_dt |
|
1524 |
event['end_datetime'] = end_dt |
|
1525 |
if end_dt >= update_datetime: |
|
1526 |
TimePeriodException.objects.create(**event) |
|
1527 |
total_created += 1 |
|
1528 | ||
1529 |
return total_created |
|
1530 | ||
1531 | 1418 |
def get_opening_hours(self, date): |
1532 | 1419 |
openslots = IntervalSet() |
1533 | 1420 |
for timeperiod in self.timeperiod_set.all(): |
... | ... | |
1680 | 1567 |
self.enabled = False |
1681 | 1568 |
self.save() |
1682 | 1569 | |
1570 |
def _check_ics_content(self): |
|
1571 |
if self.ics_url: |
|
1572 |
try: |
|
1573 |
response = requests.get(self.ics_url, proxies=settings.REQUESTS_PROXIES) |
|
1574 |
response.raise_for_status() |
|
1575 |
except requests.HTTPError as e: |
|
1576 |
raise ICSError( |
|
1577 |
_('Failed to retrieve remote calendar (%(url)s, HTTP error %(status_code)s).') |
|
1578 |
% {'url': self.ics_url, 'status_code': e.response.status_code} |
|
1579 |
) |
|
1580 |
except requests.RequestException as e: |
|
1581 |
raise ICSError( |
|
1582 |
_('Failed to retrieve remote calendar (%(url)s, %(exception)s).') |
|
1583 |
% {'url': self.ics_url, 'exception': e} |
|
1584 |
) |
|
1585 |
try: |
|
1586 |
# override response encoding received in HTTP headers as it may |
|
1587 |
# often be missing and defaults to iso-8859-15. |
|
1588 |
response.content.decode('utf-8') |
|
1589 |
response.encoding = 'utf-8' |
|
1590 |
except UnicodeDecodeError: |
|
1591 |
pass |
|
1592 |
data = response.text |
|
1593 |
else: |
|
1594 |
data = force_text(self.ics_file.read()) |
|
1595 | ||
1596 |
try: |
|
1597 |
parsed = vobject.readOne(data) |
|
1598 |
except vobject.base.ParseError: |
|
1599 |
raise ICSError(_('File format is invalid.')) |
|
1600 | ||
1601 |
if not parsed.contents.get('vevent'): |
|
1602 |
raise ICSError(_('The file doesn\'t contain any events.')) |
|
1603 | ||
1604 |
for vevent in parsed.contents.get('vevent', []): |
|
1605 |
summary = self._get_summary_from_vevent(vevent) |
|
1606 |
try: |
|
1607 |
vevent.dtstart.value |
|
1608 |
except AttributeError: |
|
1609 |
raise ICSError(_('Event "%s" has no start date.') % summary) |
|
1610 | ||
1611 |
return parsed |
|
1612 | ||
1613 |
def _get_summary_from_vevent(self, vevent): |
|
1614 |
if 'summary' in vevent.contents: |
|
1615 |
return force_text(vevent.contents['summary'][0].value) |
|
1616 |
return _('Exception') |
|
1617 | ||
1618 |
def refresh_timeperiod_exceptions(self, data=None): |
|
1619 |
self.refresh_timeperiod_exceptions_from_ics(data=data) |
|
1620 | ||
1621 |
def refresh_timeperiod_exceptions_from_ics(self, data=None, recurring_days=600): |
|
1622 |
if data is None: |
|
1623 |
parsed = self._check_ics_content() |
|
1624 |
else: |
|
1625 |
parsed = data |
|
1626 | ||
1627 |
with transaction.atomic(): |
|
1628 |
# delete old exceptions related to this source |
|
1629 |
self.timeperiodexception_set.all().delete() |
|
1630 |
# create new exceptions |
|
1631 |
update_datetime = now() |
|
1632 |
for vevent in parsed.contents.get('vevent', []): |
|
1633 |
summary = self._get_summary_from_vevent(vevent) |
|
1634 |
try: |
|
1635 |
start_dt = vevent.dtstart.value |
|
1636 |
if not isinstance(start_dt, datetime.datetime): |
|
1637 |
start_dt = datetime.datetime.combine(start_dt, datetime.datetime.min.time()) |
|
1638 |
if not is_aware(start_dt): |
|
1639 |
start_dt = make_aware(start_dt) |
|
1640 |
except AttributeError: |
|
1641 |
raise ICSError(_('Event "%s" has no start date.') % summary) |
|
1642 |
try: |
|
1643 |
end_dt = vevent.dtend.value |
|
1644 |
if not isinstance(end_dt, datetime.datetime): |
|
1645 |
end_dt = datetime.datetime.combine(end_dt, datetime.datetime.min.time()) |
|
1646 |
if not is_aware(end_dt): |
|
1647 |
end_dt = make_aware(end_dt) |
|
1648 |
duration = end_dt - start_dt |
|
1649 |
except AttributeError: |
|
1650 |
try: |
|
1651 |
duration = vevent.duration.value |
|
1652 |
end_dt = start_dt + duration |
|
1653 |
except AttributeError: |
|
1654 |
# events without end date are considered as ending the same day |
|
1655 |
end_dt = make_aware(datetime.datetime.combine(start_dt, datetime.datetime.max.time())) |
|
1656 |
duration = end_dt - start_dt |
|
1657 | ||
1658 |
event = { |
|
1659 |
'start_datetime': start_dt, |
|
1660 |
'end_datetime': end_dt, |
|
1661 |
'label': summary, |
|
1662 |
'desk_id': self.desk_id, |
|
1663 |
'source': self, |
|
1664 |
'recurrence_id': 0, |
|
1665 |
} |
|
1666 | ||
1667 |
if not vevent.rruleset: |
|
1668 |
# classical event |
|
1669 |
TimePeriodException.objects.create(**event) |
|
1670 |
elif vevent.rruleset.count(): |
|
1671 |
# recurring event until recurring_days in the future |
|
1672 |
from_dt = start_dt |
|
1673 |
until_dt = update_datetime + datetime.timedelta(days=recurring_days) |
|
1674 |
if not is_aware(vevent.rruleset[0]): |
|
1675 |
from_dt = make_naive(from_dt) |
|
1676 |
until_dt = make_naive(until_dt) |
|
1677 |
i = -1 |
|
1678 |
for i, start_dt in enumerate(vevent.rruleset.between(from_dt, until_dt, inc=True)): |
|
1679 |
# recompute start_dt and end_dt from occurrences and duration |
|
1680 |
if not is_aware(start_dt): |
|
1681 |
start_dt = make_aware(start_dt) |
|
1682 |
end_dt = start_dt + duration |
|
1683 |
event['recurrence_id'] = i |
|
1684 |
event['start_datetime'] = start_dt |
|
1685 |
event['end_datetime'] = end_dt |
|
1686 |
if end_dt >= update_datetime: |
|
1687 |
TimePeriodException.objects.create(**event) |
|
1688 | ||
1683 | 1689 |
@classmethod |
1684 | 1690 |
def import_json(cls, data): |
1685 | 1691 |
data = clean_import_data(cls, data) |
chrono/manager/views.py | ||
---|---|---|
24 | 24 | |
25 | 25 |
from django.contrib import messages |
26 | 26 |
from django.core.exceptions import PermissionDenied |
27 |
from django.db import transaction |
|
27 | 28 |
from django.db.models import Q, Value, BooleanField |
28 | 29 |
from django.db.models import Min, Max |
29 | 30 |
from django.http import Http404, HttpResponse, HttpResponseRedirect |
... | ... | |
2212 | 2213 | |
2213 | 2214 |
def import_file(self, desk, form): |
2214 | 2215 |
if form.cleaned_data['ics_file']: |
2215 |
exceptions = desk.import_timeperiod_exceptions_from_ics_file(form.cleaned_data['ics_file'])
|
|
2216 |
form.cleaned_data['ics_file'].seek(0)
|
|
2217 |
return exceptions
|
|
2216 |
ics_file = form.cleaned_data['ics_file']
|
|
2217 |
source = desk.timeperiodexceptionsource_set.create(ics_filename=ics_file.name, ics_file=ics_file)
|
|
2218 |
ics_file.seek(0)
|
|
2218 | 2219 |
elif form.cleaned_data['ics_url']: |
2219 |
return desk.import_timeperiod_exceptions_from_remote_ics(form.cleaned_data['ics_url']) |
|
2220 |
source = desk.timeperiodexceptionsource_set.create(ics_url=form.cleaned_data['ics_url']) |
|
2221 |
parsed = source._check_ics_content() |
|
2222 |
source._parsed = parsed |
|
2223 |
return source |
|
2220 | 2224 | |
2221 | 2225 |
def form_valid(self, form): |
2222 |
exceptions = None |
|
2223 | 2226 |
desk = self.get_object() |
2227 |
sources = [] |
|
2224 | 2228 |
try: |
2225 |
if desk.agenda.desk_simple_management:
|
|
2226 |
for _desk in desk.agenda.desk_set.all():
|
|
2227 |
result = self.import_file(_desk, form)
|
|
2228 |
exceptions = result if exceptions is None else exceptions
|
|
2229 |
else: |
|
2230 |
exceptions = self.import_file(desk, form)
|
|
2229 |
with transaction.atomic():
|
|
2230 |
if desk.agenda.desk_simple_management:
|
|
2231 |
for _desk in desk.agenda.desk_set.all():
|
|
2232 |
sources.append(self.import_file(_desk, form))
|
|
2233 |
else:
|
|
2234 |
sources.append(self.import_file(desk, form))
|
|
2231 | 2235 |
except ICSError as e: |
2232 | 2236 |
form.add_error(None, force_text(e)) |
2233 | 2237 |
return self.form_invalid(form) |
2234 | 2238 | |
2235 |
if exceptions is not None: |
|
2236 |
message = ungettext( |
|
2237 |
'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions |
|
2238 |
) |
|
2239 |
message = message % {'count': exceptions} |
|
2240 |
messages.info(self.request, message) |
|
2239 |
try: |
|
2240 |
for source in sources: |
|
2241 |
source.refresh_timeperiod_exceptions(data=source._parsed) |
|
2242 |
except ICSError as e: |
|
2243 |
form.add_error(None, force_text(e)) |
|
2244 |
return self.form_invalid(form) |
|
2245 | ||
2246 |
messages.info(self.request, _('Exceptions will be imported in a few minutes.')) |
|
2241 | 2247 |
return super(DeskImportTimePeriodExceptionsView, self).form_valid(form) |
2242 | 2248 | |
2243 | 2249 | |
... | ... | |
2283 | 2289 |
return queryset.filter(ics_filename__isnull=False) |
2284 | 2290 | |
2285 | 2291 |
def import_file(self, desk, form): |
2286 |
exceptions = None |
|
2287 | 2292 |
source = desk.timeperiodexceptionsource_set.filter( |
2288 | 2293 |
ics_filename=self.get_object().ics_filename |
2289 | 2294 |
).first() |
2290 | 2295 |
if source is not None: |
2291 |
exceptions = desk.import_timeperiod_exceptions_from_ics_file( |
|
2292 |
form.cleaned_data['ics_newfile'], source=source |
|
2293 |
) |
|
2294 |
form.cleaned_data['ics_newfile'].seek(0) |
|
2295 |
return exceptions |
|
2296 |
source.refresh_timeperiod_exceptions() |
|
2296 | 2297 | |
2297 | 2298 |
def form_valid(self, form): |
2298 |
exceptions = None |
|
2299 | 2299 |
desk = self.get_object().desk |
2300 | 2300 |
try: |
2301 | 2301 |
if desk.agenda.desk_simple_management: |
2302 | 2302 |
for _desk in desk.agenda.desk_set.all(): |
2303 |
result = self.import_file(_desk, form) |
|
2304 |
exceptions = result if exceptions is None else exceptions |
|
2303 |
self.import_file(_desk, form) |
|
2305 | 2304 |
else: |
2306 |
exceptions = self.import_file(desk, form)
|
|
2305 |
self.import_file(desk, form) |
|
2307 | 2306 |
except ICSError as e: |
2308 | 2307 |
form.add_error(None, force_text(e)) |
2309 | 2308 |
return self.form_invalid(form) |
2310 | 2309 | |
2311 |
if exceptions is not None: |
|
2312 |
message = ungettext( |
|
2313 |
'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions |
|
2314 |
) |
|
2315 |
message = message % {'count': exceptions} |
|
2316 |
messages.info(self.request, message) |
|
2310 |
messages.info(self.request, _('Exceptions will be synchronized in a few minutes.')) |
|
2317 | 2311 |
return super(TimePeriodExceptionSourceReplaceView, self).form_valid(form) |
2318 | 2312 | |
2319 | 2313 | |
... | ... | |
2328 | 2322 |
return queryset.filter(ics_url__isnull=False) |
2329 | 2323 | |
2330 | 2324 |
def import_file(self, desk): |
2331 |
exceptions = None |
|
2332 | 2325 |
source = desk.timeperiodexceptionsource_set.filter(ics_url=self.get_object().ics_url).first() |
2333 | 2326 |
if source is not None: |
2334 |
exceptions = desk.import_timeperiod_exceptions_from_remote_ics(source.ics_url, source=source) |
|
2335 |
return exceptions |
|
2327 |
source.refresh_timeperiod_exceptions() |
|
2336 | 2328 | |
2337 | 2329 |
def get(self, request, *args, **kwargs): |
2338 |
exceptions = None |
|
2339 | 2330 |
desk = self.get_object().desk |
2340 | 2331 |
try: |
2341 | 2332 |
if desk.agenda.desk_simple_management: |
2342 | 2333 |
for _desk in desk.agenda.desk_set.all(): |
2343 |
result = self.import_file(_desk) |
|
2344 |
exceptions = result if exceptions is None else exceptions |
|
2334 |
self.import_file(_desk) |
|
2345 | 2335 |
else: |
2346 |
exceptions = self.import_file(desk)
|
|
2336 |
self.import_file(desk) |
|
2347 | 2337 |
except ICSError as e: |
2348 | 2338 |
messages.error(self.request, force_text(e)) |
2349 | 2339 | |
2350 |
if exceptions is not None: |
|
2351 |
message = ungettext( |
|
2352 |
'An exception has been imported.', '%(count)d exceptions have been imported.', exceptions |
|
2353 |
) |
|
2354 |
message = message % {'count': exceptions} |
|
2355 |
messages.info(self.request, message) |
|
2356 | ||
2340 |
messages.info(self.request, _('Exceptions will be synchronized in a few minutes.')) |
|
2357 | 2341 |
# redirect to settings |
2358 | 2342 |
return HttpResponseRedirect(reverse('chrono-manager-agenda-settings', kwargs={'pk': desk.agenda_id})) |
2359 | 2343 |
tests/test_agendas.py | ||
---|---|---|
451 | 451 | |
452 | 452 | |
453 | 453 |
def test_timeperiodexception_creation_from_ics(): |
454 |
agenda = Agenda(label=u'Test 1 agenda') |
|
455 |
agenda.save() |
|
456 |
desk = Desk(label='Test 1 desk', agenda=agenda) |
|
457 |
desk.save() |
|
458 |
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file( |
|
459 |
ContentFile(ICS_SAMPLE, name='sample.ics') |
|
454 |
agenda = Agenda.objects.create(label=u'Test 1 agenda') |
|
455 |
desk = Desk.objects.create(label='Test 1 desk', agenda=agenda) |
|
456 |
source = desk.timeperiodexceptionsource_set.create( |
|
457 |
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE, name='sample.ics') |
|
460 | 458 |
) |
461 |
assert exceptions_count == 2
|
|
459 |
source.refresh_timeperiod_exceptions_from_ics()
|
|
462 | 460 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
463 | 461 | |
464 | 462 | |
465 | 463 |
def test_timeperiodexception_creation_from_ics_without_startdt(): |
466 |
agenda = Agenda(label=u'Test 2 agenda') |
|
467 |
agenda.save() |
|
468 |
desk = Desk(label='Test 2 desk', agenda=agenda) |
|
469 |
desk.save() |
|
464 |
agenda = Agenda.objects.create(label=u'Test 2 agenda') |
|
465 |
desk = Desk.objects.create(label='Test 2 desk', agenda=agenda) |
|
470 | 466 |
lines = [] |
471 | 467 |
# remove start datetimes from ics |
472 | 468 |
for line in ICS_SAMPLE.splitlines(): |
... | ... | |
474 | 470 |
continue |
475 | 471 |
lines.append(line) |
476 | 472 |
ics_sample = ContentFile("\n".join(lines), name='sample.ics') |
473 |
source = desk.timeperiodexceptionsource_set.create(ics_filename='sample.ics', ics_file=ics_sample) |
|
477 | 474 |
with pytest.raises(ICSError) as e: |
478 |
desk.import_timeperiod_exceptions_from_ics_file(ics_sample)
|
|
475 |
source._check_ics_content()
|
|
479 | 476 |
assert 'Event "Événement 1" has no start date.' == str(e.value) |
480 | 477 | |
481 | 478 | |
482 | 479 |
def test_timeperiodexception_creation_from_ics_without_enddt(): |
483 |
agenda = Agenda(label=u'Test 3 agenda') |
|
484 |
agenda.save() |
|
485 |
desk = Desk(label='Test 3 desk', agenda=agenda) |
|
486 |
desk.save() |
|
480 |
agenda = Agenda.objects.create(label=u'Test 3 agenda') |
|
481 |
desk = Desk.objects.create(label='Test 3 desk', agenda=agenda) |
|
487 | 482 |
lines = [] |
488 | 483 |
# remove end datetimes from ics |
489 | 484 |
for line in ICS_SAMPLE.splitlines(): |
... | ... | |
491 | 486 |
continue |
492 | 487 |
lines.append(line) |
493 | 488 |
ics_sample = ContentFile("\n".join(lines), name='sample.ics') |
494 |
desk.import_timeperiod_exceptions_from_ics_file(ics_sample) |
|
489 |
source = desk.timeperiodexceptionsource_set.create(ics_filename='sample.ics', ics_file=ics_sample) |
|
490 |
source.refresh_timeperiod_exceptions_from_ics() |
|
495 | 491 |
for exception in TimePeriodException.objects.filter(desk=desk): |
496 | 492 |
end_time = localtime(exception.end_datetime).time() |
497 | 493 |
assert end_time == datetime.time(23, 59, 59, 999999) |
... | ... | |
499 | 495 | |
500 | 496 |
@pytest.mark.freeze_time('2017-12-01') |
501 | 497 |
def test_timeperiodexception_creation_from_ics_with_recurrences(): |
502 |
agenda = Agenda(label=u'Test 4 agenda') |
|
503 |
agenda.save() |
|
504 |
desk = Desk(label='Test 4 desk', agenda=agenda) |
|
505 |
desk.save() |
|
506 |
assert ( |
|
507 |
desk.import_timeperiod_exceptions_from_ics_file( |
|
508 |
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics') |
|
509 |
) |
|
510 |
== 3 |
|
498 |
agenda = Agenda.objects.create(label=u'Test 4 agenda') |
|
499 |
desk = Desk.objects.create(label='Test 4 desk', agenda=agenda) |
|
500 |
source = desk.timeperiodexceptionsource_set.create( |
|
501 |
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT, name='sample.ics') |
|
511 | 502 |
) |
503 |
source.refresh_timeperiod_exceptions_from_ics() |
|
512 | 504 |
assert TimePeriodException.objects.filter(desk=desk).count() == 3 |
513 | 505 | |
514 | 506 | |
515 | 507 |
def test_timeexception_creation_from_ics_with_dates(): |
516 |
agenda = Agenda(label=u'Test 5 agenda') |
|
517 |
agenda.save() |
|
518 |
desk = Desk(label='Test 5 desk', agenda=agenda) |
|
519 |
desk.save() |
|
508 |
agenda = Agenda.objects.create(label=u'Test 5 agenda') |
|
509 |
desk = Desk.objects.create(label='Test 5 desk', agenda=agenda) |
|
520 | 510 |
lines = [] |
521 | 511 |
# remove end datetimes from ics |
522 | 512 |
for line in ICS_SAMPLE_WITH_RECURRENT_EVENT.splitlines(): |
... | ... | |
524 | 514 |
continue |
525 | 515 |
lines.append(line) |
526 | 516 |
ics_sample = ContentFile("\n".join(lines), name='sample.ics') |
527 |
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(ics_sample) |
|
528 |
assert exceptions_count == 2 |
|
517 |
source = desk.timeperiodexceptionsource_set.create(ics_filename='sample.ics', ics_file=ics_sample) |
|
518 |
source.refresh_timeperiod_exceptions_from_ics() |
|
519 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
|
529 | 520 |
for exception in TimePeriodException.objects.filter(desk=desk): |
530 | 521 |
assert localtime(exception.start_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0)) |
531 | 522 |
assert localtime(exception.end_datetime) == make_aware(datetime.datetime(2018, 1, 1, 0, 0)) |
532 | 523 | |
533 | 524 | |
534 | 525 |
def test_timeexception_create_from_invalid_ics(): |
535 |
agenda = Agenda(label=u'Test 6 agenda') |
|
536 |
agenda.save() |
|
537 |
desk = Desk(label='Test 6 desk', agenda=agenda) |
|
538 |
desk.save() |
|
526 |
agenda = Agenda.objects.create(label=u'Test 6 agenda') |
|
527 |
desk = Desk.objects.create(label='Test 6 desk', agenda=agenda) |
|
528 |
source = desk.timeperiodexceptionsource_set.create( |
|
529 |
ics_filename='sample.ics', ics_file=ContentFile(INVALID_ICS_SAMPLE, name='sample.ics') |
|
530 |
) |
|
539 | 531 |
with pytest.raises(ICSError) as e: |
540 |
desk.import_timeperiod_exceptions_from_ics_file(ContentFile(INVALID_ICS_SAMPLE, name='sample.ics'))
|
|
532 |
source._check_ics_content()
|
|
541 | 533 |
assert str(e.value) == 'File format is invalid.' |
542 | 534 | |
543 | 535 | |
544 | 536 |
def test_timeexception_create_from_ics_with_no_events(): |
545 |
agenda = Agenda(label=u'Test 7 agenda') |
|
546 |
agenda.save() |
|
547 |
desk = Desk(label='Test 7 desk', agenda=agenda) |
|
548 |
desk.save() |
|
537 |
agenda = Agenda.objects.create(label=u'Test 7 agenda') |
|
538 |
desk = Desk.objects.create(label='Test 7 desk', agenda=agenda) |
|
539 |
source = desk.timeperiodexceptionsource_set.create( |
|
540 |
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics') |
|
541 |
) |
|
549 | 542 |
with pytest.raises(ICSError) as e: |
550 |
desk.import_timeperiod_exceptions_from_ics_file( |
|
551 |
ContentFile(ICS_SAMPLE_WITH_NO_EVENTS, name='sample.ics') |
|
552 |
) |
|
543 |
source._check_ics_content() |
|
553 | 544 |
assert str(e.value) == "The file doesn't contain any events." |
554 | 545 | |
555 | 546 | |
556 | 547 |
@mock.patch('chrono.agendas.models.requests.get') |
557 | 548 |
def test_timeperiodexception_creation_from_remote_ics(mocked_get): |
558 |
agenda = Agenda(label=u'Test 8 agenda') |
|
559 |
agenda.save() |
|
560 |
desk = Desk(label='Test 8 desk', agenda=agenda) |
|
561 |
desk.save() |
|
549 |
agenda = Agenda.objects.create(label=u'Test 8 agenda') |
|
550 |
desk = Desk.objects.create(label='Test 8 desk', agenda=agenda) |
|
562 | 551 |
mocked_response = mock.Mock() |
563 | 552 |
mocked_response.text = ICS_SAMPLE |
564 | 553 |
mocked_get.return_value = mocked_response |
565 |
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
566 |
assert exceptions_count == 2 |
|
554 |
source = desk.timeperiodexceptionsource_set.create(ics_url='http://example.com/sample.ics') |
|
555 |
source.refresh_timeperiod_exceptions_from_ics() |
|
556 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
|
567 | 557 |
assert 'Événement 1' in [x.label for x in desk.timeperiodexception_set.all()] |
568 | 558 | |
569 | 559 |
mocked_response.text = ICS_SAMPLE_WITH_NO_EVENTS |
570 | 560 |
mocked_get.return_value = mocked_response |
571 | 561 |
with pytest.raises(ICSError) as e: |
572 |
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
562 |
source._check_ics_content()
|
|
573 | 563 |
assert str(e.value) == "The file doesn't contain any events." |
574 | 564 | |
575 | 565 | |
576 | 566 |
@mock.patch('chrono.agendas.models.requests.get') |
577 | 567 |
def test_timeperiodexception_remote_ics_encoding(mocked_get): |
578 |
agenda = Agenda(label=u'Test 8 agenda') |
|
579 |
agenda.save() |
|
580 |
desk = Desk(label='Test 8 desk', agenda=agenda) |
|
581 |
desk.save() |
|
568 |
agenda = Agenda.objects.create(label=u'Test 8 agenda') |
|
569 |
desk = Desk.objects.create(label='Test 8 desk', agenda=agenda) |
|
582 | 570 |
mocked_response = mock.Mock() |
583 | 571 |
mocked_response.content = ICS_SAMPLE.encode('iso-8859-15') |
584 | 572 |
mocked_response.text = ICS_SAMPLE |
585 | 573 |
mocked_get.return_value = mocked_response |
586 |
exceptions_count = desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics') |
|
587 |
assert exceptions_count == 2 |
|
574 |
source = desk.timeperiodexceptionsource_set.create(ics_url='http://example.com/sample.ics') |
|
575 |
source.refresh_timeperiod_exceptions_from_ics() |
|
576 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
|
588 | 577 |
assert 'Événement 1' in [x.label for x in desk.timeperiodexception_set.all()] |
589 | 578 | |
590 | 579 | |
591 | 580 |
@mock.patch('chrono.agendas.models.requests.get') |
592 | 581 |
def test_timeperiodexception_creation_from_unreachable_remote_ics(mocked_get): |
593 |
agenda = Agenda(label=u'Test 9 agenda') |
|
594 |
agenda.save() |
|
595 |
desk = Desk(label='Test 9 desk', agenda=agenda) |
|
596 |
desk.save() |
|
582 |
agenda = Agenda.objects.create(label=u'Test 9 agenda') |
|
583 |
desk = Desk.objects.create(label='Test 9 desk', agenda=agenda) |
|
597 | 584 |
mocked_response = mock.Mock() |
598 | 585 |
mocked_response.text = ICS_SAMPLE |
599 | 586 |
mocked_get.return_value = mocked_response |
... | ... | |
601 | 588 |
def mocked_requests_connection_error(*args, **kwargs): |
602 | 589 |
raise requests.ConnectionError('unreachable') |
603 | 590 | |
591 |
source = desk.timeperiodexceptionsource_set.create(ics_url='http://example.com/sample.ics') |
|
604 | 592 |
mocked_get.side_effect = mocked_requests_connection_error |
605 | 593 |
with pytest.raises(ICSError) as e: |
606 |
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
594 |
source._check_ics_content()
|
|
607 | 595 |
assert str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, unreachable)." |
608 | 596 | |
609 | 597 | |
610 | 598 |
@mock.patch('chrono.agendas.models.requests.get') |
611 | 599 |
def test_timeperiodexception_creation_from_forbidden_remote_ics(mocked_get): |
612 |
agenda = Agenda(label=u'Test 10 agenda') |
|
613 |
agenda.save() |
|
614 |
desk = Desk(label='Test 10 desk', agenda=agenda) |
|
615 |
desk.save() |
|
600 |
agenda = Agenda.objects.create(label=u'Test 10 agenda') |
|
601 |
desk = Desk.objects.create(label='Test 10 desk', agenda=agenda) |
|
616 | 602 |
mocked_response = mock.Mock() |
617 | 603 |
mocked_response.status_code = 403 |
618 | 604 |
mocked_get.return_value = mocked_response |
... | ... | |
620 | 606 |
def mocked_requests_http_forbidden_error(*args, **kwargs): |
621 | 607 |
raise requests.HTTPError(response=mocked_response) |
622 | 608 | |
609 |
source = desk.timeperiodexceptionsource_set.create(ics_url='http://example.com/sample.ics') |
|
623 | 610 |
mocked_get.side_effect = mocked_requests_http_forbidden_error |
624 | ||
625 | 611 |
with pytest.raises(ICSError) as e: |
626 |
desk.import_timeperiod_exceptions_from_remote_ics('http://example.com/sample.ics')
|
|
612 |
source._check_ics_content()
|
|
627 | 613 |
assert ( |
628 | 614 |
str(e.value) == "Failed to retrieve remote calendar (http://example.com/sample.ics, HTTP error 403)." |
629 | 615 |
) |
... | ... | |
631 | 617 | |
632 | 618 |
@mock.patch('chrono.agendas.models.requests.get') |
633 | 619 |
def test_sync_desks_timeperiod_exceptions_from_ics(mocked_get, capsys): |
634 |
agenda = Agenda(label=u'Test 11 agenda') |
|
635 |
agenda.save() |
|
636 |
desk = Desk(label='Test 11 desk', agenda=agenda) |
|
637 |
desk.save() |
|
620 |
agenda = Agenda.objects.create(label=u'Test 11 agenda') |
|
621 |
desk = Desk.objects.create(label='Test 11 desk', agenda=agenda) |
|
638 | 622 |
source = TimePeriodExceptionSource.objects.create(desk=desk, ics_url='http://example.com/sample.ics') |
639 | 623 |
mocked_response = mock.Mock() |
640 | 624 |
mocked_response.status_code = 403 |
... | ... | |
655 | 639 |
assert source.ics_filename is None |
656 | 640 |
assert source.ics_file.name is None |
657 | 641 |
with mock.patch( |
658 |
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics' |
|
659 |
) as import_remote_ics: |
|
660 |
with mock.patch( |
|
661 |
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file' |
|
662 |
) as import_file_ics: |
|
663 |
call_command('sync_desks_timeperiod_exceptions') |
|
664 |
assert import_remote_ics.call_args_list == [mock.call('http://example.com/sample.ics', source=source)] |
|
665 |
assert import_file_ics.call_args_list == [] |
|
642 |
'chrono.agendas.models.TimePeriodExceptionSource.refresh_timeperiod_exceptions_from_ics' |
|
643 |
) as refresh: |
|
644 |
call_command('sync_desks_timeperiod_exceptions') |
|
645 |
assert refresh.call_args_list == [mock.call()] |
|
666 | 646 | |
667 | 647 |
source.ics_url = None |
668 | 648 |
source.ics_filename = 'sample.ics' |
669 | 649 |
source.ics_file = ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics') |
670 | 650 |
source.save() |
671 | 651 |
with mock.patch( |
672 |
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics' |
|
673 |
) as import_remote_ics: |
|
674 |
with mock.patch( |
|
675 |
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file' |
|
676 |
) as import_file_ics: |
|
677 |
call_command('sync_desks_timeperiod_exceptions') |
|
678 |
assert import_remote_ics.call_args_list == [] |
|
679 |
assert import_file_ics.call_args_list == [mock.call(mock.ANY, source=source)] |
|
652 |
'chrono.agendas.models.TimePeriodExceptionSource.refresh_timeperiod_exceptions_from_ics' |
|
653 |
) as refresh: |
|
654 |
call_command('sync_desks_timeperiod_exceptions') |
|
655 |
assert refresh.call_args_list == [mock.call()] |
|
680 | 656 | |
681 | 657 |
TimePeriodExceptionSource.objects.update(ics_file='') |
682 | 658 |
with mock.patch( |
683 |
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics' |
|
684 |
) as import_remote_ics: |
|
685 |
with mock.patch( |
|
686 |
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file' |
|
687 |
) as import_file_ics: |
|
688 |
call_command('sync_desks_timeperiod_exceptions') |
|
689 |
assert import_remote_ics.call_args_list == [] |
|
690 |
assert import_file_ics.call_args_list == [] |
|
659 |
'chrono.agendas.models.TimePeriodExceptionSource.refresh_timeperiod_exceptions_from_ics' |
|
660 |
) as refresh: |
|
661 |
call_command('sync_desks_timeperiod_exceptions') |
|
662 |
assert refresh.call_args_list == [] |
|
691 | 663 | |
692 | 664 |
TimePeriodExceptionSource.objects.update(ics_file=None) |
693 | 665 |
with mock.patch( |
694 |
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_remote_ics' |
|
695 |
) as import_remote_ics: |
|
696 |
with mock.patch( |
|
697 |
'chrono.agendas.models.Desk.import_timeperiod_exceptions_from_ics_file' |
|
698 |
) as import_file_ics: |
|
699 |
call_command('sync_desks_timeperiod_exceptions') |
|
700 |
assert import_remote_ics.call_args_list == [] |
|
701 |
assert import_file_ics.call_args_list == [] |
|
666 |
'chrono.agendas.models.TimePeriodExceptionSource.refresh_timeperiod_exceptions_from_ics' |
|
667 |
) as refresh: |
|
668 |
call_command('sync_desks_timeperiod_exceptions') |
|
669 |
assert refresh.call_args_list == [] |
|
702 | 670 | |
703 | 671 | |
704 | 672 |
@override_settings( |
... | ... | |
811 | 779 |
def test_timeperiodexception_creation_from_ics_with_duration(): |
812 | 780 |
# test that event defined using duration works and give the same start and |
813 | 781 |
# end dates |
814 |
agenda = Agenda(label=u'Test 1 agenda') |
|
815 |
agenda.save() |
|
816 |
desk = Desk(label='Test 1 desk', agenda=agenda) |
|
817 |
desk.save() |
|
818 |
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file( |
|
819 |
ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics') |
|
782 |
agenda = Agenda.objects.create(label=u'Test 1 agenda') |
|
783 |
desk = Desk.objects.create(label='Test 1 desk', agenda=agenda) |
|
784 |
source = desk.timeperiodexceptionsource_set.create( |
|
785 |
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE_WITH_DURATION, name='sample.ics') |
|
820 | 786 |
) |
821 |
assert exceptions_count == 2
|
|
787 |
source.refresh_timeperiod_exceptions_from_ics()
|
|
822 | 788 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
823 | 789 |
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set( |
824 | 790 |
[ |
... | ... | |
838 | 804 |
def test_timeperiodexception_creation_from_ics_with_recurrences_in_the_past(): |
839 | 805 |
# test that recurrent events before today are not created |
840 | 806 |
# also test that duration + recurrent events works |
841 |
agenda = Agenda(label=u'Test 4 agenda') |
|
842 |
agenda.save() |
|
843 |
desk = Desk(label='Test 4 desk', agenda=agenda) |
|
844 |
desk.save() |
|
845 |
assert ( |
|
846 |
desk.import_timeperiod_exceptions_from_ics_file( |
|
847 |
ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics') |
|
848 |
) |
|
849 |
== 2 |
|
807 |
agenda = Agenda.objects.create(label=u'Test 4 agenda') |
|
808 |
desk = Desk.objects.create(label='Test 4 desk', agenda=agenda) |
|
809 |
source = desk.timeperiodexceptionsource_set.create( |
|
810 |
ics_filename='sample.ics', |
|
811 |
ics_file=ContentFile(ICS_SAMPLE_WITH_RECURRENT_EVENT_IN_THE_PAST, name='sample.ics'), |
|
850 | 812 |
) |
813 |
source.refresh_timeperiod_exceptions_from_ics() |
|
851 | 814 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
852 | 815 |
assert set(TimePeriodException.objects.values_list('start_datetime', flat=True)) == set( |
853 | 816 |
[make_aware(datetime.datetime(2018, 1, 1)), make_aware(datetime.datetime(2019, 1, 1))] |
... | ... | |
855 | 818 | |
856 | 819 | |
857 | 820 |
def test_timeperiodexception_creation_from_ics_with_recurrences_atreal(): |
858 |
agenda = Agenda(label=u'Test atreal agenda') |
|
859 |
agenda.save() |
|
860 |
desk = Desk(label='Test atreal desk', agenda=agenda) |
|
861 |
desk.save() |
|
862 |
assert desk.import_timeperiod_exceptions_from_ics_file(ContentFile(ICS_ATREAL, name='sample.ics')) |
|
821 |
agenda = Agenda.objects.create(label=u'Test atreal agenda') |
|
822 |
desk = Desk.objects.create(label='Test atreal desk', agenda=agenda) |
|
823 |
source = desk.timeperiodexceptionsource_set.create( |
|
824 |
ics_filename='sample.ics', ics_file=ContentFile(ICS_ATREAL, name='sample.ics') |
|
825 |
) |
|
826 |
source.refresh_timeperiod_exceptions_from_ics() |
|
827 |
assert TimePeriodException.objects.filter(desk=desk).exists() |
|
863 | 828 | |
864 | 829 | |
865 | 830 |
def test_management_role_deletion(): |
... | ... | |
1245 | 1210 |
def test_desk_duplicate_exception_sources(): |
1246 | 1211 |
agenda = Agenda.objects.create(label='Agenda') |
1247 | 1212 |
desk = Desk.objects.create(label='Desk', agenda=agenda) |
1248 |
exceptions_count = desk.import_timeperiod_exceptions_from_ics_file(
|
|
1249 |
ContentFile(ICS_SAMPLE, name='sample.ics') |
|
1213 |
source = desk.timeperiodexceptionsource_set.create(
|
|
1214 |
ics_filename='sample.ics', ics_file=ContentFile(ICS_SAMPLE, name='sample.ics')
|
|
1250 | 1215 |
) |
1251 |
source = desk.timeperiodexceptionsource_set.get(ics_filename='sample.ics') |
|
1252 |
assert exceptions_count == 2 |
|
1216 |
source.refresh_timeperiod_exceptions_from_ics() |
|
1253 | 1217 |
assert TimePeriodException.objects.filter(desk=desk).count() == 2 |
1254 | 1218 | |
1255 | 1219 |
new_desk = desk.duplicate(label="New Desk") |
1256 | 1220 |
new_source = new_desk.timeperiodexceptionsource_set.get(ics_filename='sample.ics') |
1257 |
assert new_desk.timeperiodexception_set.count() == exceptions_count
|
|
1221 |
assert new_desk.timeperiodexception_set.count() == 2
|
|
1258 | 1222 | |
1259 | 1223 |
source.delete() |
1260 |
assert new_desk.timeperiodexception_set.count() == exceptions_count
|
|
1224 |
assert new_desk.timeperiodexception_set.count() == 2
|
|
1261 | 1225 | |
1262 | 1226 |
new_source.delete() |
1263 | 1227 |
assert not new_desk.timeperiodexception_set.exists() |
tests/test_manager.py | ||
---|---|---|
2919 | 2919 |
assert "To add new exceptions, you can upload a file or specify an address to a remote calendar." in resp |
2920 | 2920 |
resp = resp.form.submit(status=200) |
2921 | 2921 |
assert 'Please provide an ICS File or an URL.' in resp.text |
2922 |
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 0 |
|
2922 | 2923 |
resp.form['ics_file'] = Upload('exceptions.ics', b'invalid content', 'text/calendar') |
2923 | 2924 |
resp = resp.form.submit(status=200) |
2924 | 2925 |
assert 'File format is invalid' in resp.text |
2926 |
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 0 |
|
2925 | 2927 |
ics_with_no_start_date = b"""BEGIN:VCALENDAR |
2926 | 2928 |
VERSION:2.0 |
2927 | 2929 |
PRODID:-//foo.bar//EN |
... | ... | |
2933 | 2935 |
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_start_date, 'text/calendar') |
2934 | 2936 |
resp = resp.form.submit(status=200) |
2935 | 2937 |
assert 'Event "New Year's Eve" has no start date.' in resp.text |
2938 |
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 0 |
|
2936 | 2939 |
ics_with_no_events = b"""BEGIN:VCALENDAR |
2937 | 2940 |
VERSION:2.0 |
2938 | 2941 |
PRODID:-//foo.bar//EN |
... | ... | |
2940 | 2943 |
resp.form['ics_file'] = Upload('exceptions.ics', ics_with_no_events, 'text/calendar') |
2941 | 2944 |
resp = resp.form.submit(status=200) |
2942 | 2945 |
assert "The file doesn't contain any events." in resp.text |
2946 |
assert TimePeriodExceptionSource.objects.filter(desk=desk).count() == 0 |
|
2943 | 2947 | |
2944 | 2948 |
ics_with_exceptions = b"""BEGIN:VCALENDAR |
2945 | 2949 |
VERSION:2.0 |
... | ... | |
2962 | 2966 |
assert 'exceptions.ics' in source.ics_file.name |
2963 | 2967 |
assert source.ics_url is None |
2964 | 2968 |
resp = resp.follow() |
2965 |
assert 'An exception has been imported.' in resp.text
|
|
2969 |
assert 'Exceptions will be imported in a few minutes.' in resp.text
|
|
2966 | 2970 | |
2967 | 2971 | |
2968 | 2972 |
@pytest.mark.freeze_time('2017-12-01') |
2969 |
- |