Package flumotion :: Package component :: Package consumers :: Package disker :: Module disker
[hide private]

Source Code for Module flumotion.component.consumers.disker.disker

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import errno 
 23  import os 
 24  import time 
 25  from datetime import datetime 
 26   
 27  import gobject 
 28  import gst 
 29  import time 
 30   
 31  from twisted.internet import reactor 
 32   
 33  from flumotion.component import feedcomponent 
 34  from flumotion.common import log, gstreamer, pygobject, messages, errors 
 35   
 36  # proxy import 
 37  from flumotion.component.component import moods 
 38  from flumotion.common.pygobject import gsignal 
 39   
 40  from flumotion.common.messages import N_ 
 41  T_ = messages.gettexter('flumotion') 
 42   
 43  __all__ = ['Disker'] 
 44   
 45  try: 
 46      # icalendar and dateutil modules needed for scheduling recordings 
 47      from icalendar import Calendar 
 48      from dateutil import rrule 
 49      HAS_ICAL = True 
 50  except: 
 51      HAS_ICAL = False 
 52   
53 -class DiskerMedium(feedcomponent.FeedComponentMedium):
54 # called when admin ui wants to stop recording. call changeFilename to 55 # restart
56 - def remote_stopRecording(self):
57 self.comp.stop_recording()
58 59 # called when admin ui wants to change filename (this starts recording if 60 # the disker isn't currently writing to disk)
61 - def remote_changeFilename(self, filenameTemplate=None):
62 self.comp.change_filename(filenameTemplate)
63
64 - def remote_scheduleRecordings(self, ical):
65 self.comp.parse_ical(ical)
66 67 # called when admin ui wants updated state (current filename info)
68 - def remote_notifyState(self):
69 self.comp.update_ui_state()
70
71 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
72 componentMediumClass = DiskerMedium 73 checkOffset = True 74 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false' 75 file = None 76 directory = None 77 location = None 78 caps = None 79
80 - def init(self):
81 self.uiState.addKey('filename', None) 82 self.uiState.addKey('recording', False) 83 self.uiState.addKey('can-schedule', HAS_ICAL)
84
85 - def get_pipeline_string(self, properties):
86 directory = properties['directory'] 87 88 self.directory = directory 89 90 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')]) 91 92 rotateType = properties.get('rotate-type', 'none') 93 94 # validate rotate-type and size/time properties first 95 if not rotateType in ['none', 'size', 'time']: 96 m = messages.Error(T_(N_( 97 "The configuration property 'rotate-type' should be set to " 98 "'size', time', or 'none', not '%s'. " 99 "Please fix the configuration."), 100 rotateType), id='rotate-type') 101 self.addMessage(m) 102 raise errors.ComponentSetupHandledError() 103 104 # size and time types need the property specified 105 if rotateType in ['size', 'time']: 106 if rotateType not in properties.keys(): 107 m = messages.Error(T_(N_( 108 "The configuration property '%s' should be set. " 109 "Please fix the configuration."), 110 rotateType), id='rotate-type') 111 self.addMessage(m) 112 raise errors.ComponentSetupHandledError() 113 114 # now act on the properties 115 if rotateType == 'size': 116 self.setSizeRotate(properties['size']) 117 elif rotateType == 'time': 118 self.setTimeRotate(properties['time']) 119 # FIXME: should add a way of saying "do first cycle at this time" 120 121 return self.pipe_template
122
123 - def setTimeRotate(self, time):
124 """ 125 @param time: duration of file (in seconds) 126 """ 127 reactor.callLater(time, self._rotateTimeCallback, time)
128
129 - def setSizeRotate(self, size):
130 """ 131 @param size: size of file (in bytes) 132 """ 133 reactor.callLater(5, self._rotateSizeCallback, size)
134
135 - def _rotateTimeCallback(self, time):
136 self.change_filename() 137 138 # Add a new one 139 reactor.callLater(time, self._rotateTimeCallback, time)
140
141 - def _rotateSizeCallback(self, size):
142 if not self.location: 143 self.warning('Cannot rotate file, no file location set') 144 else: 145 if os.stat(self.location).st_size > size: 146 self.change_filename() 147 148 # Add a new one 149 reactor.callLater(5, self._rotateTimeCallback, size)
150
151 - def get_mime(self):
152 if self.caps: 153 return self.caps.get_structure(0).get_name()
154
155 - def get_content_type(self):
156 mime = self.get_mime() 157 if mime == 'multipart/x-mixed-replace': 158 mime += ";boundary=ThisRandomString" 159 return mime
160
161 - def change_filename(self, filenameTemplate=None):
162 """ 163 @param filenameTemplate: strftime formatted string to decide filename 164 """ 165 self.debug("change_filename()") 166 mime = self.get_mime() 167 if mime == 'application/ogg': 168 ext = 'ogg' 169 elif mime == 'multipart/x-mixed-replace': 170 ext = 'multipart' 171 elif mime == 'audio/mpeg': 172 ext = 'mp3' 173 elif mime == 'video/x-msvideo': 174 ext = 'avi' 175 elif mime == 'video/x-ms-asf': 176 ext = 'asf' 177 elif mime == 'audio/x-flac': 178 ext = 'flac' 179 elif mime == 'audio/x-wav': 180 ext = 'wav' 181 elif mime == 'video/x-matroska': 182 ext = 'mkv' 183 elif mime == 'video/x-dv': 184 ext = 'dv' 185 else: 186 ext = 'data' 187 188 sink = self.get_element('fdsink') 189 if sink.get_state() == gst.STATE_NULL: 190 sink.set_state(gst.STATE_READY) 191 192 if self.file: 193 self.file.flush() 194 sink.emit('remove', self.file.fileno()) 195 self.file = None 196 if self.symlink_to_last_recording: 197 self.update_symlink(self.location, 198 self.symlink_to_last_recording) 199 200 filename = "" 201 if not filenameTemplate: 202 filenameTemplate = self._defaultFilenameTemplate 203 filename = "%s.%s" % (time.strftime(filenameTemplate, 204 time.localtime()), ext) 205 self.location = os.path.join(self.directory, filename) 206 207 try: 208 self.file = open(self.location, 'a') 209 except IOError, e: 210 self.warning("Failed to open output file %s: %s", 211 self.location, log.getExceptionMessage(e)) 212 m = messages.Error(T_(N_("Failed to open output file " 213 "%s. Check your permissions." 214 % (self.location,)))) 215 self.addMessage(m) 216 return 217 sink.emit('add', self.file.fileno()) 218 self.uiState.set('filename', self.location) 219 self.uiState.set('recording', True) 220 221 if self.symlink_to_current_recording: 222 self.update_symlink(self.location, 223 self.symlink_to_current_recording)
224 246
247 - def stop_recording(self):
248 sink = self.get_element('fdsink') 249 if sink.get_state() == gst.STATE_NULL: 250 sink.set_state(gst.STATE_READY) 251 252 if self.file: 253 self.file.flush() 254 sink.emit('remove', self.file.fileno()) 255 self.file = None 256 self.uiState.set('filename', None) 257 self.uiState.set('recording', False) 258 if self.symlink_to_last_recording: 259 self.update_symlink(self.location, 260 self.symlink_to_last_recording)
261
262 - def _notify_caps_cb(self, pad, param):
263 caps = pad.get_negotiated_caps() 264 if caps == None: 265 return 266 267 caps_str = gstreamer.caps_repr(caps) 268 self.debug('Got caps: %s' % caps_str) 269 270 new = True 271 if not self.caps == None: 272 self.warning('Already had caps: %s, replacing' % caps_str) 273 new = False 274 275 self.debug('Storing caps: %s' % caps_str) 276 self.caps = caps 277 278 if new and self._recordAtStart: 279 reactor.callLater(0, self.change_filename)
280 281 # callback for when a client is removed so we can figure out 282 # errors
283 - def _client_removed_cb(self, element, arg0, client_status):
284 # check if status is error 285 if client_status == 4: 286 reactor.callFromThread(self._client_error_cb)
287
288 - def _client_error_cb(self):
289 self.file.close() 290 self.file = None 291 # make element sad 292 self.setMood(moods.sad) 293 id = "error-writing-%s" % self.location 294 m = messages.Error(T_(N_( 295 "Error writing to file %s. Maybe disk is full." % ( 296 self.location))), 297 id=id, priority=40) 298 self.state.append('messages', m)
299
300 - def configure_pipeline(self, pipeline, properties):
301 self.debug('configure_pipeline for disker') 302 self.symlink_to_last_recording = \ 303 properties.get('symlink-to-last-recording', None) 304 self.symlink_to_current_recording = \ 305 properties.get('symlink-to-current-recording', None) 306 self._recordAtStart = properties.get('start-recording', True) 307 self._defaultFilenameTemplate = properties.get('filename', 308 '%s.%%Y%%m%%d-%%H%%M%%S' % self.getName()) 309 icalfn = properties.get('ical-schedule') 310 if icalfn: 311 ical = open(icalfn, "rb").read() 312 self.parse_ical(ical) 313 self._recordAtStart = False 314 315 sink = self.get_element('fdsink') 316 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb) 317 # connect to client-removed so we can detect errors in file writing 318 sink.connect('client-removed', self._client_removed_cb)
319 320 # add code that lets recordings be schedules 321 # TODO: resolve overlapping events
322 - def schedule_recording(self, whenStart, whenEnd, recur=None, 323 filenameTemplate=None):
324 """ 325 Sets a recording to start at a time in the future for a specified 326 duration. 327 @param whenStart time of when to start recording 328 @type whenStart datetime 329 @param whenEnd time of when to end recording 330 @type whenEnd datetime 331 @param recur recurrence rule 332 @type recur icalendar.props.vRecur 333 @param filenameTemplate strftime formatted string to decide filename 334 @type filenameTemplate string 335 """ 336 now = datetime.now() 337 338 startRecurRule = None 339 endRecurRule = None 340 341 if recur: 342 self.debug("Have a recurrence rule, parsing") 343 # create dateutil.rrule from the recurrence rules 344 startRecurRule = rrule.rrulestr(recur.ical(), dtstart=whenStart) 345 endRecurRule = rrule.rrulestr(recur.ical(), dtstart=whenEnd) 346 if now >= whenStart: 347 self.debug("Initial start before now (%r), finding new starts", 348 whenStart) 349 whenStart = startRecurRule.after(now) 350 whenEnd = endRecurRule.after(now) 351 self.debug("New start is now %r", whenStart) 352 353 if now < whenStart: 354 start = whenStart - now 355 startSecs = start.days * 86400 + start.seconds 356 self.debug("scheduling a recording %d seconds away", startSecs) 357 reactor.callLater(startSecs, 358 self.start_scheduled_recording, startRecurRule, whenStart, 359 filenameTemplate) 360 end = whenEnd - now 361 endSecs = end.days * 86400 + end.seconds 362 reactor.callLater(endSecs, 363 self.stop_scheduled_recording, endRecurRule, whenEnd) 364 else: 365 self.warning("attempt to schedule in the past!")
366
367 - def start_scheduled_recording(self, recurRule, when, filenameTemplate):
368 self.change_filename(filenameTemplate) 369 if recurRule: 370 now = datetime.now() 371 nextTime = recurRule.after(when) 372 recurInterval = nextTime - now 373 self.debug("recurring start interval: %r", recurInterval) 374 recurIntervalSeconds = recurInterval.days * 86400 + \ 375 recurInterval.seconds 376 self.debug("recurring start in %d seconds", recurIntervalSeconds) 377 reactor.callLater(recurIntervalSeconds, 378 self.start_scheduled_recording, 379 recurRule, nextTime, filenameTemplate)
380
381 - def stop_scheduled_recording(self, recurRule, when):
382 self.stop_recording() 383 if recurRule: 384 now = datetime.now() 385 nextTime = recurRule.after(when) 386 recurInterval = nextTime - now 387 recurIntervalSeconds = recurInterval.days * 86400 + \ 388 recurInterval.seconds 389 self.debug("recurring stop in %d seconds", recurIntervalSeconds) 390 reactor.callLater(recurIntervalSeconds, 391 self.stop_scheduled_recording, 392 recurRule, nextTime)
393
394 - def parse_ical(self, icsStr):
395 if HAS_ICAL: 396 cal = Calendar.from_string(icsStr) 397 for event in cal.walk('vevent'): 398 dtstart = event.decoded('dtstart', '') 399 dtend = event.decoded('dtend', '') 400 summary = event.decoded('summary', None) 401 self.debug("event parsed with start: %r end: %r and summary: %s" 402 , dtstart, dtend, summary) 403 recur = event.get('rrule', None) 404 if dtstart and dtend: 405 self.schedule_recording(dtstart, dtend, recur, summary) 406 else: 407 self.warning("Cannot parse ICAL; neccesary modules not installed")
408 409 pygobject.type_register(Disker) 410