Project

General

Profile

« Previous | Next » 

Revision a04356e2

Added by Benjamin Dauvergne over 13 years ago

agenda: add support for event exceptions

View differences:

calebasse/agenda/managers.py
class EventQuerySet(InheritanceQuerySet):
def for_today(self, today=None):
today = today or date.today()
excluded = self.filter(exceptions__exception_date=today).values_list('id', flat=True)
weeks = weeks_since_epoch(today)
filters = [Q(start_datetime__gte=datetime.combine(today, time()),
start_datetime__lte=datetime.combine(today, time(23,59,59)),
recurrence_periodicity__isnull=True) ]
recurrence_periodicity__isnull=True,
canceled=False) ]
base_q = Q(start_datetime__lte=datetime.combine(today, time(23,59,59)),
canceled=False,
recurrence_periodicity__isnull=False) & \
(Q(recurrence_end_date__gte=today) |
Q(recurrence_end_date__isnull=True))
Q(recurrence_end_date__isnull=True)) & \
~ Q(id__in=excluded)
# week periods
for period in range(1, 6):
filters.append(base_q & Q(recurrence_week_offset=weeks % period,
calebasse/agenda/models.py
old_rr_id = models.CharField(max_length=8, blank=True, null=True)
# only used when there is no rr id
old_rs_id = models.CharField(max_length=8, blank=True, null=True)
# exception to is mutually exclusive with recurrence_periodicity
# an exception cannot be periodic
exception_to = models.ForeignKey('self', related_name='exceptions',
blank=True, null=True,
verbose_name=u'Exception à')
exception_date = models.DateField(blank=True, null=True,
verbose_name=u'Reporté du')
# canceled can only be used with exception to
canceled = models.BooleanField(_('Annulé'))
PERIODS = (
(1, u'Toutes les semaines'),
......
verbose_name = u'Evénement'
verbose_name_plural = u'Evénements'
ordering = ('start_datetime', 'end_datetime', 'title')
unique_together = (('exception_to', 'exception_date'),)
def __init__(self, *args, **kwargs):
if kwargs.get('start_datetime') and not kwargs.has_key('recurrence_end_date'):
......
'''Distance between start and end of the event'''
return self.end_datetime - self.start_datetime
def today_occurence(self, today=None):
def match_date(self, date):
if self.is_recurring():
# consider exceptions
exception = self.get_exceptions_dict().get(date)
if exception is not None:
return exception if exception.match_date(date) else None
if self.canceled:
return None
if date.weekday() != self.recurrence_week_day:
return None
if self.start_datetime.date() > date:
return None
if self.recurrence_end_date and self.recurrence_end_date < date:
return None
if self.recurrence_week_period is not None:
if weeks_since_epoch(date) % self.recurrence_week_period != self.recurrence_week_offset:
return None
elif self.recurrence_week_parity is not None:
if date.isocalendar()[1] % 2 != self.recurrence_week_parity:
return None
elif self.recurrence_week_rank is not None:
if self.recurrence_week_rank not in weekday_ranks(date):
return None
else:
raise NotImplemented
return self
else:
return date == self.start_datetime.date()
def today_occurence(self, today=None, match=False):
'''For a recurring event compute the today 'Event'.
The computed event is the fake one that you cannot save to the database.
'''
today = today or date.today()
if not self.is_recurring():
if today == self.start_datetime.date():
return self
else:
return None
if today.weekday() != self.recurrence_week_day:
return None
if self.start_datetime.date() > today:
return None
if self.recurrence_end_date and self.recurrence_end_date < today:
if self.canceled:
return None
if self.recurrence_week_period is not None:
if weeks_since_epoch(today) % self.recurrence_week_period != self.recurrence_week_offset:
return None
elif self.recurrence_week_parity is not None:
if today.isocalendar()[1] % 2 != self.recurrence_week_parity:
return None
elif self.recurrence_week_rank is not None:
if self.recurrence_week_rank not in weekday_ranks(today):
if match:
exception = self.get_exceptions_dict().get(today)
if exception and exception.start_datetime.date() == today():
return exception.today_occurence(today, True)
else:
return None
else:
raise NotImplemented
exception_or_self = self.match_date(today)
if exception_or_self is None:
return None
if exception_or_self != self:
return exception_or_self.today_occurence(today)
start_datetime = datetime.combine(today, self.start_datetime.timetz())
end_datetime = start_datetime + self.timedelta()
event = copy(self)
event.exception_to = self
event.exception_date = today
event.start_datetime = start_datetime
event.end_datetime = end_datetime
event.recurrence_end_date = None
event.recurrence_week_offset = None
event.recurrence_periodicity = None
event.recurrence_week_offset = 0
event.recurrence_week_period = None
event.recurrence_week_parity = None
event.recurrence_week_rank = None
event.recurrence_end_date = None
event.parent = self
# the returned event is "virtual", it must not be saved
def save(*args, **kwarks):
raise RuntimeError()
old_save = event.save
old_participants = list(self.participants.all())
def save(*args, **kwargs):
event.id = None
old_save(*args, **kwargs)
event.participants = old_participants
event.save = save
return event
......
'''Is this event multiple ?'''
return self.recurrence_periodicity is not None
def get_exceptions_dict(self):
if not hasattr(self, 'exceptions_dict'):
self.exceptions_dict = dict()
for exception in self.exceptions.all():
self.exceptions_dict[exception.exception_date] = exception
return self.exceptions_dict
def all_occurences(self, limit=90):
'''Returns all occurences of this event as virtual Event objects
......
day = self.start_datetime.date()
max_end_date = max(date.today(), self.start_datetime.date()) + timedelta(days=limit)
end_date = min(self.recurrence_end_date or max_end_date, max_end_date)
occurrences = []
if self.recurrence_week_period is not None:
delta = timedelta(days=self.recurrence_week_period*7)
while day <= end_date:
yield self.today_occurence(day)
occurrence = self.today_occurence(day)
if occurrence is not None:
occurrences.append(occurrence)
day += delta
elif self.recurrence_week_parity is not None:
delta = timedelta(days=7)
while day <= end_date:
if day.isocalendar()[1] % 2 == self.recurrence_week_parity:
yield self.today_occurence(day)
if occurrence is not None:
occurrences.append(occurrence)
day += delta
elif self.recurrence_week_rank is not None:
delta = timedelta(days=7)
while day <= end_date:
if self.recurrence_week_rank in weekday_ranks(day):
yield self.today_occurence(day)
if occurrence is not None:
occurrences.append(occurrence)
day += delta
for exception in self.exceptions.all():
if exception.exception_date != exception.start_datetime.date():
occurrences.append(exception)
return sorted(occurrences, key=lambda o: o.start_datetime)
else:
yield self
return [self]
def save(self, *args, **kwargs):
self.clean() # force call to clean to initialize recurrence fields
super(Event, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
# never delete, only cancel
from ..actes.models import Act
for a in Act.objects.filter(parent_event=self):
if len(a.actvalidationstate_set.all()) > 1:
a.parent_event = None
a.save()
super(Event, self).delete(*args, **kwargs)
else:
a.delete()
self.canceled = True
self.save()
def to_interval(self):
return Interval(self.start_datetime, self.end_datetime)
calebasse/agenda/tests.py
self.assertEqual(Act.objects.filter(parent_event=appointment2).count(), 1)
self.assertEqual(Act.objects.count(), 2)
def test_weekly_event_with_exception(self):
'''
We create a single weekly event for 3 weeks between 2012-10-01
and 2012-10-15, then we add two exception:
- a change on the date for the second occurrence,
- a cancellation for the third occurrence.
'''
event = Event.objects.create(start_datetime=datetime(2012, 10, 1, 13),
end_datetime=datetime(2012, 10, 1, 13, 30), event_type=EventType(id=1),
recurrence_periodicity=1, recurrence_end_date=date(2012, 10, 15))
exception1 = Event.objects.create(start_datetime=datetime(2012, 10, 9, 13, 30),
end_datetime=datetime(2012, 10, 9, 14), event_type=EventType(id=1),
exception_to=event, exception_date=date(2012, 10, 8))
exception2 = Event.objects.create(start_datetime=datetime(2012, 10, 15, 13, 30),
end_datetime=datetime(2012, 10, 15, 14), event_type=EventType(id=1),
exception_to=event, exception_date=date(2012, 10, 15), canceled=True)
a = Event.objects.for_today(date(2012, 10, 1))
self.assertEqual(list(a), [event])
b = Event.objects.for_today(date(2012, 10, 8))
self.assertEqual(list(b), [])
b1 = Event.objects.for_today(date(2012, 10, 9))
self.assertEqual(list(b1), [exception1])
c = Event.objects.for_today(date(2012, 10, 15))
self.assertEqual(list(c), [])
def test_weekly_event_exception_creation(self):
'''
We create a single weekly event for 3 weeks between 2012-10-01
and 2012-10-15, then we list its occurrences, modify them and save them:
- a change on the date for the second occurrence,
- a cancellation for the third occurrence.
'''
wtype = WorkerType.objects.create(name='ElDoctor', intervene=True)
therapist1 = Worker.objects.create(first_name='Pierre', last_name='PaulJacques', type=wtype)
therapist2 = Worker.objects.create(first_name='Bob', last_name='Leponge', type=wtype)
event = Event.objects.create(start_datetime=datetime(2012, 10, 1, 13),
end_datetime=datetime(2012, 10, 1, 13, 30), event_type=EventType(id=1),
recurrence_periodicity=1, recurrence_end_date=date(2012, 10, 15))
event.participants = [ therapist1 ]
print repr(event)
occurrences = list(event.all_occurences())
print occurrences
self.assertEqual(len(occurrences), 3)
self.assertEqual(occurrences[1].start_datetime, datetime(2012, 10, 8, 13))
occurrences[1].start_datetime = datetime(2012, 10, 9, 13)
occurrences[1].end_datetime = datetime(2012, 10, 9, 13, 30)
occurrences[1].save()
print event.id, occurrences[1].id
occurrences[1].participants = [ therapist1, therapist2 ]
occurrences[2].canceled = True
occurrences[2].save()
a = Event.objects.today_occurences(date(2012, 10, 1))
self.assertEqual(list(a), [event])
print repr(a[0])
print event.participants.all()
self.assertEqual(set(a[0].participants.select_subclasses()), set([therapist1]))
a1 = list(a[0].all_occurences())
print a1
self.assertEqual(len(a1), 2)
b = Event.objects.for_today(date(2012, 10, 8))
self.assertEqual(list(b), [])
b1 = Event.objects.for_today(date(2012, 10, 9))
self.assertEqual(list(b1), [occurrences[1]])
self.assertEqual(set(b1[0].participants.select_subclasses()), set([therapist1, therapist2]))
c = Event.objects.for_today(date(2012, 10, 15))
self.assertEqual(list(c), [])

Also available in: Unified diff