Package flumotion :: Package component :: Package base :: Module scheduler
[hide private]

Source Code for Module flumotion.component.base.scheduler

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_base_scheduler -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  from datetime import datetime, timedelta 
 23   
 24  from twisted.internet import reactor 
 25   
 26  from flumotion.common import log 
 27  from flumotion.component.base import watcher 
 28  from flumotion.common.eventcalendar import parseCalendar, parseCalendarFromFile 
 29  from flumotion.common.eventcalendar import Event, LOCAL, EventSet 
 30   
 31  __version__ = "$Rev: 7096 $" 
 32   
 33   
34 -class Scheduler(log.Loggable):
35 """ 36 I keep track of upcoming events. 37 38 I can provide notifications when events end and start, and maintain 39 a set of current events. 40 """ 41 windowSize = timedelta(days=1) 42
43 - def __init__(self):
44 self._delayedCall = None 45 self._subscribeId = 0 46 self.subscribers = {} 47 self._eventSets = {} 48 self._nextStart = 0
49
50 - def _addEvent(self, event):
51 self.debug("adding event %s", event.uid) 52 uid = event.uid 53 if uid not in self._eventSets: 54 self._eventSets[uid] = EventSet(uid) 55 self._eventSets[uid].addEvent(event) 56 if event.start < event.now < event.end: 57 self._eventStarted(event)
58
59 - def addEvent(self, uid, start, end, content, rrule=None, now=None, 60 exdates=None):
61 """Add a new event to the scheduler 62 63 @param uid: uid of event 64 @type uid: str 65 @param start: wall-clock time of event start 66 @type start: datetime 67 @param end: wall-clock time of event end 68 @type end: datetime 69 @param content: content of this event 70 @type content: str 71 @param rrule: recurrence rule, either as a string parseable by 72 datetime.rrule.rrulestr or as a datetime.timedelta 73 @type rrule: None, str, or datetime.timedelta 74 75 @returns: an Event that can later be passed to removeEvent, if 76 so desired. The event will be removed or rescheduled 77 automatically when it ends. 78 """ 79 80 if now is None: 81 now = datetime.now(LOCAL) 82 event = Event(uid, start, end, content, rrule=rrule, exdates=exdates, 83 now=now) 84 if event.end < now and not rrule: 85 self.warning('attempted to schedule event in the past: %r', 86 event) 87 return event 88 89 self._addEvent(event) 90 self._reschedule() 91 return event
92
93 - def removeEvent(self, event):
94 """Remove an event from the scheduler. 95 96 @param event: an event, as returned from addEvent() 97 @type event: Event 98 """ 99 self._removeEvent(event) 100 self._reschedule()
101
102 - def _removeEvent(self, event):
103 uid = event.uid 104 if uid not in self._eventSets: 105 return 106 current = self.getCurrentEvents() 107 if event in current: 108 self._eventEnded(event) 109 self._eventSets[uid].removeEvent(event)
110
111 - def getCurrentEvents(self, now=None, windowSize=None):
112 """Get a list of current events. 113 @param now: Use now as localtime. If not set the local time is used. 114 @type now: datetime 115 @param windowSize: get events on this window. If not set, the returned 116 events will be on the class windowSize member. 117 @type windowSize: timedelta 118 119 @return: Events that are being run. 120 @rtype: L{Event} 121 """ 122 if now is None: 123 now = datetime.now(LOCAL) 124 if windowSize is None: 125 windowSize = timedelta(seconds=0) 126 current = [] 127 for eventSet in self._eventSets.values(): 128 points = eventSet.getPoints(now, now + windowSize) 129 for point in points: 130 event = point.eventInstance.event 131 event.currentStart = point.eventInstance.currentStart 132 event.currentEnd = point.eventInstance.currentEnd 133 if not event in current: 134 current.append(event) 135 return current
136
137 - def addEvents(self, events):
138 """ 139 Add a new list of events to the schedule. 140 141 @param events: the new events 142 @type events: list of Event 143 """ 144 result = [] 145 for event in events: 146 e = self._addEvent(event) 147 result.append(e) 148 self._reschedule() 149 return result
150
151 - def replaceEvents(self, events, now=None):
152 """Replace the set of events in the scheduler. 153 154 This function is different than simply removing all events then 155 adding new ones, because it tries to avoid spurious 156 ended/started notifications. 157 158 @param events: the new events 159 @type events: a sequence of Event 160 """ 161 if now is None: 162 now = datetime.now(LOCAL) 163 currentEvents = self.getCurrentEvents() 164 for _, eventSet in self._eventSets.iteritems(): 165 eventsToRemove = eventSet.getEvents()[:] 166 for event in eventsToRemove: 167 if event not in currentEvents: 168 self._removeEvent(event) 169 for event in events: 170 self.debug("adding event %r", event.uid) 171 if event.start > now or event.rrule: 172 self._addEvent(event) 173 else: 174 self.debug("event is a past event and it is not added") 175 self._reschedule()
176
177 - def subscribe(self, eventStarted, eventEnded):
178 """ 179 Subscribe to event happenings in the scheduler. 180 181 @param eventStarted: function that will be called when an event starts 182 @type eventStarted: function taking L{Event} 183 @param eventEnded: function that will be called when an event ends 184 @type eventEnded: function taking L{Event} 185 186 @returns: A subscription ID that can later be passed to 187 unsubscribe(). 188 """ 189 sid = self._subscribeId 190 self._subscribeId += 1 191 self.subscribers[sid] = (eventStarted, eventEnded) 192 return sid
193
194 - def unsubscribe(self, id):
195 """Unsubscribe from event happenings in the scheduler. 196 197 @param id: Subscription ID received from subscribe() 198 """ 199 del self.subscribers[id]
200
201 - def _eventStarted(self, event):
202 for started, _ in self.subscribers.values(): 203 started(event)
204
205 - def _eventEnded(self, event):
206 for _, ended in self.subscribers.values(): 207 ended(event)
208
209 - def _reschedule(self):
210 211 def _getNextEvent(now): 212 earliest = now + self.windowSize 213 which = None 214 result = None 215 for event in self.getCurrentEvents(now, self.windowSize): 216 self.debug("current event %s", event.uid) 217 if event.currentStart < earliest and event.currentStart > now: 218 earliest = event.currentStart 219 which = 'start' 220 result = event 221 if event.currentEnd < earliest: 222 earliest = event.currentEnd 223 which = 'end' 224 result = event 225 return result, which
226 227 def doStart(e): 228 self._eventStarted(e) 229 self._reschedule()
230 231 def doEnd(e): 232 self._eventEnded(e) 233 self._eventSets[e.uid].removeEvent(e) 234 self._reschedule() 235 236 self.debug("schedule events") 237 self._cancelScheduledCalls() 238 239 now = datetime.now(LOCAL) 240 241 event, which = _getNextEvent(now) 242 243 def toSeconds(td): 244 return max(td.days*24*3600 + td.seconds + td.microseconds/1e6, 0) 245 246 if event: 247 if which == 'start': 248 self.debug("schedule start event at %s", str(event.currentStart - now)) 249 250 seconds = toSeconds(event.currentStart - now) 251 dc = reactor.callLater(seconds, doStart, event) 252 elif which == 'end': 253 self.debug("schedule end event at %s", str(event.currentEnd - now)) 254 255 seconds = toSeconds(event.currentEnd - now) 256 dc = reactor.callLater(seconds, doEnd, event) 257 else: 258 self.debug("schedule rescheduling at %s", str(self.windowSize)) 259 260 seconds = toSeconds(self.windowSize) 261 dc = reactor.callLater(seconds, self._reschedule) 262 self._nextStart = seconds 263 self._delayedCall = dc 264
265 - def _cancelScheduledCalls(self):
266 if self._delayedCall: 267 if self._delayedCall.active(): 268 self._delayedCall.cancel() 269 self._delayedCall = None
270
271 -class ICalScheduler(Scheduler):
272
273 - def __init__(self, fileObj):
274 """ 275 I am a scheduler that takes its data from an ical file and watches 276 that file every timeout. Very important: only future events will 277 be added, not past nor present. 278 @param fileObj: The fileObj. It must be already opened. 279 @type fileObj: open file. 280 """ 281 Scheduler.__init__(self) 282 if not fileObj: 283 return 284 285 def parseFromFile(f): 286 eventSets = parseCalendarFromFile(f) 287 self._setEventSets(eventSets)
288 parseFromFile(fileObj) 289 290 if hasattr(fileObj, 'name'): 291 def fileChanged(f): 292 self.debug("ics file changed") 293 parseFromFile(open(f, 'r'))
294 self.watcher = watcher.FilesWatcher([fileObj.name]) 295 self.watcher.subscribe(fileChanged=fileChanged) 296 self.watcher.start() 297
298 - def _setEventSets(self, eventSets):
299 events = [] 300 for eventSet in eventSets: 301 self.debug("add eventset %s", eventSet.uid) 302 events.extend(eventSet.getEvents()) 303 self.replaceEvents(events)
304
305 - def parseCalendar(self, calendar):
306 eventSets = parseCalendar(calendar) 307 self._setEventSets(eventSets)
308
309 - def stopWatchingIcalFile(self):
310 """ 311 Stop watching the ical file. 312 """ 313 self.watcher.stop()
314