Package flumotion :: Package component :: Package consumers :: Package disker :: Module disker
[hide private]

Source Code for Module flumotion.component.consumers.disker.disker

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import errno 
 23  import os 
 24  import time 
 25  from datetime import datetime 
 26   
 27  import gobject 
 28  import gst 
 29  import time 
 30   
 31  from twisted.internet import reactor 
 32   
 33  from flumotion.component import feedcomponent 
 34  from flumotion.common import log, gstreamer, pygobject, messages, errors 
 35  from flumotion.common.format import strftime 
 36  from flumotion.common.i18n import N_, gettexter 
 37  from flumotion.common.mimetypes import mimeTypeToExtention 
 38  from flumotion.common.pygobject import gsignal 
 39  # proxy import 
 40  from flumotion.component.component import moods 
 41   
 42  __all__ = ['Disker'] 
 43  __version__ = "$Rev: 6995 $" 
 44  T_ = gettexter() 
 45   
 46   
 47   
 48  """ 
 49  Disker has a property 'ical-schedule'. This allows an ical file to be 
 50  specified in the config and have recordings scheduled based on events. 
 51  This file will be monitored for changes and events reloaded if this 
 52  happens. 
 53   
 54  The filename of a recording started from an ical file will be produced 
 55  via passing the ical event summary through strftime, so that an archive 
 56  can encode the date and time that it was begun. 
 57   
 58  The time that will be given to strftime will be given in the timezone of 
 59  the ical event. In practice this will either be UTC or the local time of 
 60  the machine running the disker, as the ical scheduler does not 
 61  understand arbitrary timezones. 
 62  """ 
 63   
 64  try: 
 65      # icalendar and dateutil modules needed for scheduling recordings 
 66      from icalendar import Calendar 
 67      from dateutil import rrule 
 68      HAS_ICAL = True 
 69  except ImportError: 
 70      HAS_ICAL = False 
 71   
72 -class DiskerMedium(feedcomponent.FeedComponentMedium):
73 # called when admin ui wants to stop recording. call changeFilename to 74 # restart
75 - def remote_stopRecording(self):
76 self.comp.stop_recording()
77 78 # called when admin ui wants to change filename (this starts recording if 79 # the disker isn't currently writing to disk)
80 - def remote_changeFilename(self, filenameTemplate=None):
81 self.comp.change_filename(filenameTemplate)
82 83 # called when admin ui wants updated state (current filename info)
84 - def remote_notifyState(self):
85 self.comp.update_ui_state()
86
87 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
88 componentMediumClass = DiskerMedium 89 checkOffset = True 90 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false' 91 file = None 92 directory = None 93 location = None 94 caps = None 95
96 - def init(self):
97 self.uiState.addKey('filename', None) 98 self.uiState.addKey('recording', False) 99 self.uiState.addKey('can-schedule', HAS_ICAL)
100
101 - def get_pipeline_string(self, properties):
102 directory = properties['directory'] 103 104 self.directory = directory 105 106 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')]) 107 108 rotateType = properties.get('rotate-type', 'none') 109 110 # validate rotate-type and size/time properties first 111 if not rotateType in ['none', 'size', 'time']: 112 m = messages.Error(T_(N_( 113 "The configuration property 'rotate-type' should be set to " 114 "'size', time', or 'none', not '%s'. " 115 "Please fix the configuration."), 116 rotateType), mid='rotate-type') 117 self.addMessage(m) 118 raise errors.ComponentSetupHandledError() 119 120 # size and time types need the property specified 121 if rotateType in ['size', 'time']: 122 if rotateType not in properties.keys(): 123 m = messages.Error(T_(N_( 124 "The configuration property '%s' should be set. " 125 "Please fix the configuration."), 126 rotateType), mid='rotate-type') 127 self.addMessage(m) 128 raise errors.ComponentSetupHandledError() 129 130 # now act on the properties 131 if rotateType == 'size': 132 self.setSizeRotate(properties['size']) 133 elif rotateType == 'time': 134 self.setTimeRotate(properties['time']) 135 # FIXME: should add a way of saying "do first cycle at this time" 136 137 return self.pipe_template
138
139 - def setTimeRotate(self, time):
140 """ 141 @param time: duration of file (in seconds) 142 """ 143 reactor.callLater(time, self._rotateTimeCallback, time)
144
145 - def setSizeRotate(self, size):
146 """ 147 @param size: size of file (in bytes) 148 """ 149 reactor.callLater(5, self._rotateSizeCallback, size)
150
151 - def _rotateTimeCallback(self, time):
152 self.change_filename() 153 154 # Add a new one 155 reactor.callLater(time, self._rotateTimeCallback, time)
156
157 - def _rotateSizeCallback(self, size):
158 if not self.location: 159 self.warning('Cannot rotate file, no file location set') 160 else: 161 if os.stat(self.location).st_size > size: 162 self.change_filename() 163 164 # Add a new one 165 reactor.callLater(5, self._rotateTimeCallback, size)
166
167 - def get_mime(self):
168 if self.caps: 169 return self.caps.get_structure(0).get_name()
170
171 - def get_content_type(self):
172 mime = self.get_mime() 173 if mime == 'multipart/x-mixed-replace': 174 mime += ";boundary=ThisRandomString" 175 return mime
176
177 - def change_filename(self, filenameTemplate=None, timeOrTuple=None):
178 """ 179 @param filenameTemplate: strftime formatted string to decide filename 180 @param timeOrTuple: a valid time to pass to strftime, defaulting 181 to time.localtime(). A 9-tuple may be passed instead. 182 """ 183 mime = self.get_mime() 184 ext = mimeTypeToExtention(mime) 185 186 self.stop_recording() 187 188 sink = self.get_element('fdsink') 189 if sink.get_state() == gst.STATE_NULL: 190 sink.set_state(gst.STATE_READY) 191 192 filename = "" 193 if not filenameTemplate: 194 filenameTemplate = self._defaultFilenameTemplate 195 filename = "%s.%s" % (strftime(filenameTemplate, 196 timeOrTuple or time.localtime()), ext) 197 self.location = os.path.join(self.directory, filename) 198 self.info("Changing filename to %s", self.location) 199 try: 200 self.file = open(self.location, 'a') 201 except IOError, e: 202 self.warning("Failed to open output file %s: %s", 203 self.location, log.getExceptionMessage(e)) 204 m = messages.Error(T_(N_( 205 "Failed to open output file '%s' for writing. " 206 "Check permissions on the file."), self.location)) 207 self.addMessage(m) 208 return 209 self._plug_recording_started(self.file, self.location) 210 sink.emit('add', self.file.fileno()) 211 self.uiState.set('filename', self.location) 212 self.uiState.set('recording', True) 213 214 if self.symlink_to_current_recording: 215 self.update_symlink(self.location, 216 self.symlink_to_current_recording)
217 239
240 - def stop_recording(self):
241 sink = self.get_element('fdsink') 242 if sink.get_state() == gst.STATE_NULL: 243 sink.set_state(gst.STATE_READY) 244 245 if self.file: 246 self.file.flush() 247 sink.emit('remove', self.file.fileno()) 248 self._plug_recording_stopped(self.file, self.location) 249 self.file = None 250 self.uiState.set('filename', None) 251 self.uiState.set('recording', False) 252 if self.symlink_to_last_recording: 253 self.update_symlink(self.location, 254 self.symlink_to_last_recording)
255
256 - def _notify_caps_cb(self, pad, param):
257 caps = pad.get_negotiated_caps() 258 if caps == None: 259 return 260 261 caps_str = gstreamer.caps_repr(caps) 262 self.debug('Got caps: %s' % caps_str) 263 264 new = True 265 if not self.caps == None: 266 self.warning('Already had caps: %s, replacing' % caps_str) 267 new = False 268 269 self.debug('Storing caps: %s' % caps_str) 270 self.caps = caps 271 272 if new and self._recordAtStart: 273 reactor.callLater(0, self.change_filename, 274 self._startFilenameTemplate)
275 276 # callback for when a client is removed so we can figure out 277 # errors
278 - def _client_removed_cb(self, element, arg0, client_status):
279 # check if status is error 280 if client_status == 4: 281 reactor.callFromThread(self._client_error_cb)
282
283 - def _client_error_cb(self):
284 self.file.close() 285 self.file = None 286 # make element sad 287 self.setMood(moods.sad) 288 messageId = "error-writing-%s" % self.location 289 m = messages.Error(T_(N_( 290 "Error writing to file %s. Maybe disk is full." % ( 291 self.location))), 292 mid=messageId, priority=40) 293 self.addMessage(m)
294
295 - def configure_pipeline(self, pipeline, properties):
296 self.debug('configure_pipeline for disker') 297 self.symlink_to_last_recording = \ 298 properties.get('symlink-to-last-recording', None) 299 self.symlink_to_current_recording = \ 300 properties.get('symlink-to-current-recording', None) 301 self._recordAtStart = properties.get('start-recording', True) 302 self._defaultFilenameTemplate = properties.get('filename', 303 '%s.%%Y%%m%%d-%%H%%M%%S' % self.getName()) 304 self._startFilenameTemplate = self._defaultFilenameTemplate 305 icalfn = properties.get('ical-schedule') 306 if HAS_ICAL and icalfn: 307 from flumotion.component.base import scheduler 308 try: 309 self.icalScheduler = scheduler.ICalScheduler(open( 310 icalfn, 'r')) 311 self.icalScheduler.subscribe(self.eventStarted, 312 self.eventEnded) 313 currentEvents = self.icalScheduler.getCurrentEvents() 314 if currentEvents: 315 self._startFilenameTemplate = currentEvents[0].content 316 self._recordAtStart = True 317 else: 318 self._recordAtStart = False 319 except ValueError: 320 m = messages.Warning(T_(N_( 321 "Error parsing ical file %s, so not scheduling any" 322 " events." % icalfn)), mid="error-parsing-ical") 323 self.addMessage(m) 324 325 elif icalfn: 326 warnStr = "An ical file has been specified for " \ 327 "scheduling but the necessary modules " \ 328 "dateutil and/or icalendar are not installed" 329 self.warning(warnStr) 330 m = messages.Warning(T_(N_(warnStr)), mid="error-parsing-ical") 331 self.addMessage(m) 332 333 sink = self.get_element('fdsink') 334 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb) 335 # connect to client-removed so we can detect errors in file writing 336 sink.connect('client-removed', self._client_removed_cb) 337 338 # set event probe if we should react to video mark events 339 react_to_marks = properties.get('react-to-stream-markers', False) 340 if react_to_marks: 341 pfx = properties.get('stream-marker-filename-prefix', '%03d.') 342 self._marker_prefix = pfx 343 sink.get_pad('sink').add_event_probe(self._markers_event_probe)
344
345 - def eventStarted(self, event):
346 self.debug('starting recording of %s', event.content) 347 self.change_filename(event.content, event.currentStart.timetuple())
348
349 - def eventEnded(self, event):
350 self.debug('ending recording of %s', event.content) 351 self.stop_recording()
352
353 - def parse_ical(self, icsStr):
354 if HAS_ICAL: 355 cal = Calendar.from_string(icsStr) 356 if self.icalScheduler: 357 events = self.icalScheduler.parseCalendar(cal) 358 if events: 359 self.icalScheduler.addEvents(events) 360 else: 361 self.warning("No events found in the ical string") 362 else: 363 self.warning("Cannot parse ICAL; neccesary modules not installed")
364
365 - def _plug_recording_started(self, file, location):
366 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 367 # make sure plugs are configured with our socket, see #732 368 if socket not in self.plugs: 369 return 370 for plug in self.plugs[socket]: 371 self.debug('invoking recording_started on ' 372 'plug %r on socket %s', plug, socket) 373 plug.recording_started(file, location)
374
375 - def _plug_recording_stopped(self, file, location):
376 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 377 # make sure plugs are configured with our socket, see #732 378 if socket not in self.plugs: 379 return 380 for plug in self.plugs[socket]: 381 self.debug('invoking recording_stopped on ' 382 'plug %r on socket %s', plug, socket) 383 plug.recording_stopped(file, location)
384
385 - def _markers_event_probe(self, element, event):
386 if event.type == gst.EVENT_CUSTOM_DOWNSTREAM: 387 evt_struct = event.get_structure() 388 if evt_struct.get_name() == 'FluStreamMark': 389 if evt_struct['action'] == 'start': 390 self._on_marker_start(evt_struct['prog_id']) 391 elif evt_struct['action'] == 'stop': 392 self._on_marker_stop() 393 return True
394
395 - def _on_marker_stop(self):
396 self.stop_recording()
397
398 - def _on_marker_start(self, data):
399 tmpl = self._defaultFilenameTemplate 400 if self._marker_prefix: 401 try: 402 tmpl = '%s%s' % (self._marker_prefix % data, 403 self._defaultFilenameTemplate) 404 except TypeError, err: 405 m = messages.Warning(T_(N_('Failed expanding filename prefix: ' 406 '%r <-- %r.'), 407 self._marker_prefix, data), 408 mid='expand-marker-prefix') 409 self.addMessage(m) 410 self.warning('Failed expanding filename prefix: ' 411 '%r <-- %r; %r' % 412 (self._marker_prefix, data, err)) 413 self.change_filename(tmpl)
414