#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, absolute_import
from six import PY2, PY3, StringIO, string_types, text_type, integer_types
from six.moves import filter, map, range
import arrow
import copy
import re
from datetime import timedelta, datetime
from .alarm import AlarmFactory
from .component import Component
from .utils import (
parse_duration,
timedelta_to_duration,
iso_to_arrow,
iso_precision,
get_arrow,
arrow_to_iso,
arrow_date_to_iso,
uid_gen,
unescape_string,
escape_string,
)
from .parse import ContentLine, Container
[docs]class Event(Component):
"""A calendar event.
Can be full-day or between two instants.
Can be defined by a beginning instant and
a duration *or* end instant.
"""
_TYPE = "VEVENT"
_EXTRACTORS = []
_OUTPUTS = []
[docs] def __init__(self,
name=None,
begin=None,
end=None,
duration=None,
uid=None,
description=None,
created=None,
location=None,
url=None,
transparent=False,
alarms=None,
categories=None):
"""Instantiates a new :class:`ics.event.Event`.
Args:
name (string) : rfc5545 SUMMARY property
begin (Arrow-compatible)
end (Arrow-compatible)
duration (datetime.timedelta)
uid (string): must be unique
description (string)
created (Arrow-compatible)
location (string)
url (string)
transparent (Boolean)
alarms (:class:`ics.alarm.Alarm`
categories (set of string)
Raises:
ValueError: if `end` and `duration` are specified at the same time
"""
self._duration = None
self._end_time = None
self._begin = None
self._begin_precision = None
self.uid = uid_gen() if not uid else uid
self.description = description
self.created = get_arrow(created)
self.location = location
self.url = url
self.transparent = transparent
self.alarms = set()
self.categories = set()
self._unused = Container(name='VEVENT')
self.name = name
self.begin = begin
# TODO: DRY [1]
if duration and end:
raise ValueError(
'Event() may not specify an end and a duration \
at the same time')
elif end: # End was specified
self.end = end
elif duration: # Duration was specified
self.duration = duration
if alarms is not None:
self.alarms.update(set(alarms))
if categories is not None:
self.categories.update(set(categories))
[docs] def has_end(self):
"""
Return:
bool: self has an end
"""
return bool(self._end_time or self._duration)
@property
def begin(self):
"""Get or set the beginning of the event.
| Will return an :class:`Arrow` object.
| May be set to anything that :func:`Arrow.get` understands.
| If an end is defined (not a duration), .begin must not
be set to a superior value.
"""
return self._begin
@begin.setter
def begin(self, value):
value = get_arrow(value)
if value and self._end_time and value > self._end_time:
raise ValueError('Begin must be before end')
self._begin = value
self._begin_precision = 'second'
@property
def end(self):
"""Get or set the end of the event.
| Will return an :class:`Arrow` object.
| May be set to anything that :func:`Arrow.get` understands.
| If set to a non null value, removes any already
existing duration.
| Setting to None will have unexpected behavior if
begin is not None.
| Must not be set to an inferior value than self.begin.
"""
if self._duration: # if end is duration defined
# return the beginning + duration
return self.begin + self._duration
elif self._end_time: # if end is time defined
if self.all_day:
return self._end_time + timedelta(days=1)
else:
return self._end_time
elif self._begin: # if end is not defined
if self.all_day:
return self._begin + timedelta(days=1)
else:
# instant event
return self._begin
else:
return None
@end.setter
def end(self, value):
value = get_arrow(value)
if value and value < self._begin:
raise ValueError('End must be after begin')
self._end_time = value
if value:
self._duration = None
@property
def duration(self):
"""Get or set the duration of the event.
| Will return a timedelta object.
| May be set to anything that timedelta() understands.
| May be set with a dict ({"days":2, "hours":6}).
| If set to a non null value, removes any already
existing end time.
"""
if self._duration:
return self._duration
elif self.end:
# because of the clever getter for end, this also takes care of all_day events
return self.end - self.begin
else:
# event has neither start, nor end, nor duration
return None
@duration.setter
def duration(self, value):
if isinstance(value, dict):
value = timedelta(**value)
elif isinstance(value, timedelta):
value = value
elif value is not None:
value = timedelta(value)
if value:
self._end_time = None
self._duration = value
@property
def all_day(self):
"""
Return:
bool: self is an all-day event
"""
# the event may have an end, also given in 'day' precision
return self._begin_precision == 'day'
[docs] def make_all_day(self):
"""Transforms self to an all-day event.
The event will span all the days from the begin to the end day.
"""
was_instant = self.duration == timedelta(0)
old_end = self.end
self._duration = None
self._begin_precision = 'day'
self._begin = self._begin.floor('day')
if was_instant:
self._end_time = None
return
floored_end = old_end.floor('day')
# this "overflooring" must be done because end times are not included in the interval
calculated_end = floored_end - timedelta(days=1) if floored_end == old_end else floored_end
if calculated_end == self._begin:
# for a one day event, we don't need to save the _end_time
self._end_time = None
else:
self._end_time = calculated_end
[docs] def __urepr__(self):
"""Should not be used directly. Use self.__repr__ instead.
Returns:
unicode: a unicode representation (__repr__) of the event.
"""
name = "'{}' ".format(self.name) if self.name else ''
if self.all_day:
if not self._end_time or self._begin == self._end_time:
return "<all-day Event {}{}>".format(name, self.begin.strftime("%F"))
else:
return "<all-day Event {}begin:{} end:{}>".format(name, self._begin.strftime("%F"), self._end_time.strftime("%F"))
elif self.begin is None:
return "<Event '{}'>".format(self.name) if self.name else "<Event>"
else:
return "<Event {}begin:{} end:{}>".format(name, self.begin, self.end)
def starts_within(self, other):
if not isinstance(other, Event):
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
return self.begin >= other.begin and self.begin <= other.end
def ends_within(self, other):
if not isinstance(other, Event):
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
return self.end >= other.begin and self.end <= other.end
def intersects(self, other):
if not isinstance(other, Event):
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
return (self.starts_within(other)
or self.ends_within(other)
or other.starts_within(self)
or other.ends_within(self))
__xor__ = intersects
def includes(self, other):
if isinstance(other, Event):
return other.starts_within(self) and other.ends_within(self)
if isinstance(other, datetime):
return self.begin <= other and self.end >= other
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
def is_included_in(self, other):
if isinstance(other, Event):
return other.includes(self)
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
__in__ = is_included_in
[docs] def __lt__(self, other):
if isinstance(other, Event):
if self.begin is None and other.begin is None:
if self.name is None and other.name is None:
return False
elif self.name is None:
return True
elif other.name is None:
return False
else:
return self.name < other.name
return self.begin < other.begin
if isinstance(other, datetime):
return self.begin < other
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
[docs] def __le__(self, other):
if isinstance(other, Event):
if self.begin is None and other.begin is None:
if self.name is None and other.name is None:
return True
elif self.name is None:
return True
elif other.name is None:
return False
else:
return self.name <= other.name
return self.begin <= other.begin
if isinstance(other, datetime):
return self.begin <= other
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
[docs] def __gt__(self, other):
if isinstance(other, Event):
if self.begin is None and other.begin is None:
# TODO : handle py3 case when a name is None
return self.name > other.name
return self.begin > other.begin
if isinstance(other, datetime):
return self.begin > other
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
[docs] def __ge__(self, other):
if isinstance(other, Event):
if self.begin is None and other.begin is None:
# TODO : handle py3 case when a name is None
return self.name >= other.name
return self.begin >= other.begin
if isinstance(other, datetime):
return self.begin >= other
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
def __or__(self, other):
if isinstance(other, Event):
begin, end = None, None
if self.begin and other.begin:
begin = max(self.begin, other.begin)
if self.end and other.end:
end = min(self.end, other.end)
return (begin, end) if begin and end and begin < end else (None, None)
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
[docs] def __eq__(self, other):
"""Two events are considered equal if they have the same uid."""
if isinstance(other, Event):
return self.uid == other.uid
raise NotImplementedError(
'Cannot compare Event and {}'.format(type(other)))
def time_equals(self, other):
return (self.begin == other.begin) and (self.end == other.end)
[docs] def join(self, other, *args, **kwarg):
"""Create a new event which covers the time range of two intersecting events
All extra parameters are passed to the Event constructor.
Args:
other: the other event
Returns:
a new Event instance
"""
event = Event(*args, **kwarg)
if self.intersects(other):
if self.starts_within(other):
event.begin = other.begin
else:
event.begin = self.begin
if self.ends_within(other):
event.end = other.end
else:
event.end = self.end
return event
raise ValueError('Cannot join {} with {}: they don\'t intersect.'.format(self, other))
__and__ = join
[docs] def clone(self):
"""
Returns:
Event: an exact copy of self"""
clone = copy.copy(self)
clone._unused = clone._unused.clone()
clone.alarms = copy.copy(self.alarms)
clone.categories = copy.copy(self.categories)
return clone
[docs] def __hash__(self):
"""
Returns:
int: hash of self. Based on self.uid."""
return int(''.join(map(lambda x: '%.3d' % ord(x), self.uid)))
# ------------------
# ----- Inputs -----
# ------------------
@Event._extracts('DTSTAMP')
def created(event, line):
if line:
# get the dict of vtimezones passed to the classmethod
tz_dict = event._classmethod_kwargs['tz']
event.created = iso_to_arrow(line, tz_dict)
@Event._extracts('DTSTART')
def start(event, line):
if line:
# get the dict of vtimezones passed to the classmethod
tz_dict = event._classmethod_kwargs['tz']
event.begin = iso_to_arrow(line, tz_dict)
event._begin_precision = iso_precision(line.value)
@Event._extracts('DURATION')
def duration(event, line):
if line:
#TODO: DRY [1]
if event._end_time: # pragma: no cover
raise ValueError("An event can't have both DTEND and DURATION")
event._duration = parse_duration(line.value)
@Event._extracts('DTEND')
def end(event, line):
if line:
#TODO: DRY [1]
if event._duration:
raise ValueError("An event can't have both DTEND and DURATION")
# get the dict of vtimezones passed to the classmethod
tz_dict = event._classmethod_kwargs['tz']
event._end_time = iso_to_arrow(line, tz_dict)
# one could also save the end_precision to check that if begin_precision is day, end_precision also is
@Event._extracts('SUMMARY')
def summary(event, line):
event.name = unescape_string(line.value) if line else None
@Event._extracts('DESCRIPTION')
def description(event, line):
event.description = unescape_string(line.value) if line else None
@Event._extracts('LOCATION')
def location(event, line):
event.location = unescape_string(line.value) if line else None
@Event._extracts('URL')
def url(event, line):
event.url = unescape_string(line.value) if line else None
@Event._extracts('TRANSP')
def transparent(event, line):
if line:
event.transparent = line.value == 'TRANSPARENT'
# TODO : make uid required ?
# TODO : add option somewhere to ignore some errors
@Event._extracts('UID')
def uid(event, line):
if line:
event.uid = line.value
@Event._extracts('VALARM', multiple=True)
def alarms(event, lines):
def alarm_factory(x):
af = AlarmFactory.get_type_from_container(x)
return af._from_container(x)
event.alarms = list(map(alarm_factory, lines))
@Event._extracts('CATEGORIES')
def categories(event, line):
event.categories = set()
if line:
# In the regular expression: Only match unquoted commas.
for cat in re.split("(?<!\\\\),", line.value):
event.categories.update({unescape_string(cat)})
# -------------------
# ----- Outputs -----
# -------------------
@Event._outputs
def o_created(event, container):
if event.created:
instant = event.created
else:
instant = arrow.now()
container.append(ContentLine('DTSTAMP', value=arrow_to_iso(instant)))
@Event._outputs
def o_start(event, container):
if event.begin and not event.all_day:
container.append(ContentLine('DTSTART', value=arrow_to_iso(event.begin)))
@Event._outputs
def o_all_day(event, container):
if event.begin and event.all_day:
container.append(ContentLine('DTSTART', params={'VALUE': ('DATE',)},
value=arrow_date_to_iso(event.begin)))
@Event._outputs
def o_duration(event, container):
# TODO : DURATION
if event._duration and event.begin:
representation = timedelta_to_duration(event._duration)
container.append(ContentLine('DURATION', value=representation))
@Event._outputs
def o_end(event, container):
if event.begin and event._end_time:
container.append(ContentLine('DTEND', value=arrow_to_iso(event.end)))
@Event._outputs
def o_summary(event, container):
if event.name:
container.append(ContentLine('SUMMARY', value=escape_string(event.name)))
@Event._outputs
def o_description(event, container):
if event.description:
container.append(ContentLine('DESCRIPTION', value=escape_string(event.description)))
@Event._outputs
def o_location(event, container):
if event.location:
container.append(ContentLine('LOCATION', value=escape_string(event.location)))
@Event._outputs
def o_url(event, container):
if event.url:
container.append(ContentLine('URL', value=escape_string(event.url)))
@Event._outputs
def o_transparent(event, container):
if event.transparent:
container.append(ContentLine('TRANSP', value=escape_string('TRANSPARENT')))
else:
container.append(ContentLine('TRANSP', value=escape_string('OPAQUE')))
@Event._outputs
def o_uid(event, container):
if event.uid:
uid = event.uid
else:
uid = uid_gen()
container.append(ContentLine('UID', value=uid))
@Event._outputs
def o_alarm(event, container):
for alarm in event.alarms:
container.append(str(alarm))
@Event._outputs
def o_categories(event, container):
if bool(event.categories):
container.append(ContentLine('CATEGORIES', value=','.join([escape_string(s) for s in event.categories])))