Package flumotion :: Package component :: Package combiners :: Package switch :: Module switch
[hide private]

Source Code for Module flumotion.component.combiners.switch.switch

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import sets 
 23  import gst 
 24  import gobject 
 25   
 26  from twisted.internet import defer, reactor 
 27   
 28  from flumotion.common import errors, messages, log, python 
 29  from flumotion.common.i18n import N_, gettexter 
 30  from flumotion.common.planet import moods 
 31  from flumotion.component import feedcomponent 
 32  from flumotion.component.base import scheduler 
 33  from flumotion.component.plugs import base 
 34  from flumotion.worker.checks import check 
 35   
 36  __version__ = "$Rev: 6995 $" 
 37  T_ = gettexter() 
 38   
 39   
40 -class SwitchMedium(feedcomponent.FeedComponentMedium):
41 - def remote_switchToMaster(self):
42 return self.comp.switch_to("master")
43
44 - def remote_switchToBackup(self):
45 return self.comp.switch_to("backup")
46
47 - def remote_switchTo(self, logicalFeed):
48 return self.comp.switch_to(logicalFeed)
49
50 -class ICalSwitchPlug(base.ComponentPlug):
51 - def start(self, component):
52 self._sid = None 53 self.sched = None 54 try: 55 def eventStarted(event): 56 self.debug("event started %r", event.uid) 57 component.switch_to("backup")
58 def eventEnded(event): 59 self.debug("event ended %r", event.uid) 60 component.switch_to("master")
61 62 # if an event starts, semantics are to switch to backup 63 # if an event ends, semantics are to switch to master 64 filename = self.args['properties']['ical-schedule'] 65 self.sched = scheduler.ICalScheduler(open(filename, 'r')) 66 self._sid = self.sched.subscribe(eventStarted, eventEnded) 67 if self.sched.getCurrentEvents(): 68 component.idealFeed = "backup" 69 except ValueError: 70 fmt = N_("Error parsing ical file %s, so not scheduling " 71 "any events.") 72 component.addWarning("error-parsing-ical", fmt, filename) 73 except ImportError, e: 74 fmt = N_("An ical file has been specified for scheduling, " 75 "but the necessary modules are not installed.") 76 component.addWarning("error-parsing-ical", fmt, debug=e.message) 77
78 - def stop(self, component):
79 if self.sched: 80 self.sched.unsubscribe(self._sid)
81
82 -class Switch(feedcomponent.MultiInputParseLaunchComponent):
83 logCategory = 'switch' 84 componentMediumClass = SwitchMedium 85
86 - def init(self):
87 self.uiState.addKey("active-eater") 88 self.icalScheduler = None 89 90 # This structure maps logical feeds to sets of eaters. For 91 # example, "master" and "backup" could be logical feeds, and 92 # would be the keys in this dict, mapping to lists of eater 93 # aliases corresponding to those feeds. The lengths of those 94 # lists is equal to the number of feeders that the element has, 95 # which is the number of individual streams in a logical feed. 96 # 97 # For example, {"master": ["audio-master", "video-master"], 98 # "backup": ["audio-backup", "video-backup"]} 99 # logical feed name -> [eater alias] 100 self.logicalFeeds = {} 101 # logical feed names in order of preference 102 self.feedsByPriority = [] 103 104 # eater alias -> (sink pad, switch element) 105 self.switchPads = {} 106 107 # Two variables form the state of the switch component. 108 # idealFeed 109 # The feed that we would like to provide, as chosen by 110 # the user, either by the UI, an ical file, a pattern 111 # detection, etc. 112 # activeFeed 113 # The feed currently being provided 114 self.idealFeed = None 115 self.activeFeed = None
116
117 - def addWarning(self, id, format, *args, **kwargs):
118 self.warning(format, *args) 119 m = messages.Message(messages.WARNING, T_(format, *args), 120 id=id, **kwargs) 121 self.addMessage(m)
122
123 - def clearWarning(self, id):
124 for m in self.state.get('messages')[:]: 125 if m.id == id: 126 self.state.remove('messages', m)
127
128 - def do_check(self):
129 def checkSignal(fact): 130 fact = fact.load() 131 signals = gobject.signal_list_names(fact.get_element_type()) 132 return 'block' in signals
133 134 def cb(result): 135 for m in result.messages: 136 self.addMessage(m) 137 return result.value
138 139 self.debug("checking for input-selector element") 140 d = check.checkPlugin('selector', 'gst-plugins-bad', 141 (0, 10, 5, 2), 'input-selector', checkSignal) 142 d.addCallback(cb) 143 return d 144
145 - def do_setup(self):
146 ical = self.config['properties'].get('ical-schedule', None) 147 if ical: 148 args = {'properties': {'ical-schedule': ical}} 149 self.icalScheduler = ICalSwitchPlug(args) 150 self.icalScheduler.start(self)
151
152 - def create_pipeline(self):
153 for name, aliases in self.get_logical_feeds(): 154 assert name not in self.logicalFeeds 155 for alias in aliases: 156 assert alias in self.eaters 157 self.logicalFeeds[name] = aliases 158 if self.idealFeed is None: 159 self.debug("idealFeed being set to %s", name) 160 self.idealFeed = name 161 self.feedsByPriority.append(name) 162 163 return feedcomponent.MultiInputParseLaunchComponent.create_pipeline(self)
164
165 - def get_logical_feeds(self):
166 raise errors.NotImplementedError('subclasses should implement ' 167 'get_logical_feeds')
168
169 - def configure_pipeline(self, pipeline, properties):
170 def getDownstreamElement(e): 171 for pad in e.pads(): 172 if pad.get_direction() is gst.PAD_SRC: 173 peer = pad.get_peer() 174 return peer, peer.get_parent() 175 raise AssertionError('failed to find the switch')
176 177 switchElements = self.get_switch_elements(pipeline) 178 for alias in self.eaters: 179 e = pipeline.get_by_name(self.eaters[alias].elementName) 180 pad = None 181 while e not in switchElements: 182 self.log("Element: %s", e.get_name()) 183 pad, e = getDownstreamElement(e) 184 self.debug('eater %s maps to pad %s', alias, pad) 185 self.switchPads[alias] = pad, e 186 187 # set active pad correctly on each of the switch elements 188 # (pad, switch) 189 pairs = [self.switchPads[alias] 190 for alias in self.logicalFeeds[self.idealFeed]] 191 192 for p, s in pairs: 193 s.set_property('active-pad', p) 194 self.activeFeed = self.idealFeed 195 self.uiState.set("active-eater", self.idealFeed) 196 197 self.install_logical_feed_watches() 198 199 self.do_switch() 200
201 - def install_logical_feed_watches(self):
202 def eaterSetActive(eaterAlias): 203 for feed, aliases in self.logicalFeeds.items(): 204 if eaterAlias in aliases: 205 if feed not in activeFeeds: 206 activeFeeds.append(feed) 207 self.feedSetActive(feed) 208 return
209 210 def eaterSetInactive(eaterAlias): 211 for feed, aliases in self.logicalFeeds.items(): 212 if eaterAlias in aliases: 213 if feed in activeFeeds: 214 activeFeeds.remove(feed) 215 self.feedSetInactive(feed) 216 return 217 218 activeFeeds = [] 219 for alias in self.eaters: 220 self.eaters[alias].addWatch(eaterSetActive, eaterSetInactive) 221
222 - def get_switch_elements(self, pipeline):
223 raise errors.NotImplementedError('subclasses should implement ' 224 'get_switch_elements')
225
226 - def is_active(self, feed):
227 return python.all([self.eaters[alias].isActive() 228 for alias in self.logicalFeeds[feed]])
229
230 - def feedSetActive(self, feed):
231 self.debug('feed %r is now active', feed) 232 if feed == self.idealFeed: 233 self.do_switch()
234
235 - def feedSetInactive(self, feed):
236 self.debug('feed %r is now inactive', feed)
237 238 # this function is used by the watchdogs
239 - def auto_switch(self):
240 allFeeds = self.feedsByPriority[:] 241 feed = None 242 while allFeeds: 243 feed = allFeeds.pop(0) 244 if self.is_active(feed): 245 self.debug('autoswitch selects feed %r', feed) 246 self.do_switch(feed) 247 break 248 else: 249 self.debug("could not select feed %r because not active", feed) 250 if feed is None: 251 feed = self.feedsByPriority.get(0, None) 252 self.debug('no feeds active during autoswitch, choosing %r', 253 feed) 254 self.do_switch(feed)
255 256 # switch_to should only be called when the ideal feed is requested to be 257 # changed, so not by watchdog reasons.
258 - def switch_to(self, feed):
259 """ 260 @param feed: a logical feed 261 """ 262 if feed not in self.logicalFeeds: 263 self.warning("unknown logical feed: %s", feed) 264 return None 265 266 self.debug('scheduling switch to feed %s', feed) 267 self.idealFeed = feed 268 # here we should bump this feed above others in feedsByPriority 269 self.feedsByPriority = [feed] 270 for name, aliases in self.get_logical_feeds(): 271 if name != feed: 272 self.feedsByPriority.append(name) 273 274 if not self.pipeline: 275 return 276 277 if self.is_active(feed): 278 self.do_switch() 279 else: 280 fmt = N_("Tried to switch to %s, but feed is unavailable. " 281 "Will retry when the feed is back.") 282 self.addWarning("temporary-switch-problem", fmt, feed)
283 284 # Switching multiple eaters is easy. The only trick is that we have 285 # to close the previous segment at the same running time, on both 286 # switch elements, and open the new segment at the same running 287 # time. The block()/switch() signal API on switch elements lets us 288 # do this. See the docs for switch's `block' and `switch' signals 289 # for more information. 290
291 - def do_switch(self, feed=None):
292 if feed == None: 293 feed = self.idealFeed 294 295 self.clearWarning('temporary-switch-problem') 296 if feed == self.activeFeed: 297 self.debug("already streaming from feed %r", feed) 298 return 299 if feed not in self.logicalFeeds: 300 self.warning("unknown logical feed: %s", feed) 301 return 302 303 # (pad, switch) 304 pairs = [self.switchPads[alias] 305 for alias in self.logicalFeeds[feed]] 306 307 stop_times = [e.emit('block') for p, e in pairs] 308 start_times = [p.get_property('running-time') for p, e in pairs] 309 310 stop_time = max(stop_times) 311 self.debug('stop time = %d', stop_time) 312 self.debug('stop time = %s', gst.TIME_ARGS(stop_time)) 313 314 if stop_time != gst.CLOCK_TIME_NONE: 315 diff = float(max(stop_times) - min(stop_times)) 316 if diff > gst.SECOND * 10: 317 fmt = N_("When switching to %s, feed timestamps out" 318 " of sync by %us") 319 self.addWarning('large-timestamp-difference', fmt, 320 feed, diff / gst.SECOND, priority=40) 321 322 start_time = min(start_times) 323 self.debug('start time = %s', gst.TIME_ARGS(start_time)) 324 325 self.debug('switching from %r to %r', self.activeFeed, feed) 326 for p, e in pairs: 327 self.debug("switching to pad %r", p) 328 e.emit('switch', p, stop_time, start_time) 329 330 self.activeFeed = feed 331 self.uiState.set("active-eater", feed)
332
333 -class SingleSwitch(Switch):
334 logCategory = "single-switch" 335
336 - def get_logical_feeds(self):
337 return [('master', ['master']), 338 ('backup', ['backup'])]
339
340 - def get_muxer_string(self, properties):
341 return ("input-selector name=muxer ! " 342 "identity silent=true single-segment=true name=iden ")
343
344 - def get_switch_elements(self, pipeline):
345 return [pipeline.get_by_name('muxer')]
346
347 -class AVSwitch(Switch):
348 logCategory = "av-switch" 349
350 - def init(self):
351 # property name -> caps property name 352 self.vparms = {'video-width': 'width', 'video-height': 'height', 353 'video-framerate': 'framerate', 354 'video-pixel-aspect-ratio': 'par'} 355 self.aparms = {'audio-channels': 'channels', 356 'audio-samplerate': 'samplerate'}
357
358 - def get_logical_feeds(self):
359 return [('master', ['video-master', 'audio-master']), 360 ('backup', ['video-backup', 'audio-backup'])]
361
362 - def get_switch_elements(self, pipeline):
363 # these have to be in the same order as the lists in 364 # get_logical_feeds 365 return [pipeline.get_by_name('vswitch'), 366 pipeline.get_by_name('aswitch')]
367
368 - def addError(self, id, format, *args, **kwargs):
369 self.warning(format, *args) 370 m = messages.Message(messages.ERROR, T_(format, *args), 371 id=id, **kwargs) 372 self.addMessage(m) 373 raise errors.ComponentSetupHandledError()
374
375 - def do_check(self):
376 propkeys = sets.Set(self.config['properties'].keys()) 377 vparms = sets.Set(self.vparms.keys()) 378 aparms = sets.Set(self.aparms.keys()) 379 380 for kind, parms in ('Video', vparms), ('Audio', aparms): 381 missing = parms - (propkeys & parms) 382 if missing and missing != parms: 383 fmt = N_("%s parameter(s) were specified but not all. " 384 "Missing parameters are: %r") 385 self.addError("video-params-not-specified", fmt, kind, 386 list(missing))
387
388 - def get_pipeline_string(self, properties):
389 def i420caps(framerate, par, width, height): 390 return ("video/x-raw-yuv,width=%d,height=%d,framerate=%d/%d," 391 "pixel-aspect-ratio=%d/%d,format=(fourcc)I420" 392 % (width, height, framerate[0], framerate[1], 393 par[0], par[1]))
394 395 def audiocaps(channels, samplerate): 396 return ("audio/x-raw-int,channels=%d,samplerate=%d,width=16," 397 "depth=16,signed=true" % (channels, samplerate))
398 399 def props2caps(proc, parms, prefix, suffix=' ! '): 400 kw = dict([(parms[prop], properties[prop]) 401 for prop in properties if prop in parms]) 402 if kw: 403 return prefix + proc(**kw) + suffix 404 else: 405 return '' 406 407 vforce = props2caps(i420caps, self.vparms, 408 "ffmpegcolorspace ! videorate ! videoscale " 409 "! capsfilter caps=") 410 aforce = props2caps(audiocaps, self.aparms, 411 "audioconvert ! audioconvert ! capsfilter caps=") 412 413 pipeline = ("input-selector name=vswitch" 414 " ! identity silent=true single-segment=true" 415 " ! @feeder:video@ " 416 "input-selector name=aswitch" 417 " ! identity silent=true single-segment=true" 418 " ! @feeder:audio@ ") 419 for alias in self.eaters: 420 if "video" in alias: 421 pipeline += '@eater:%s@ ! %s vswitch. ' % (alias, vforce) 422 elif "audio" in alias: 423 pipeline += '@eater:%s@ ! %s aswitch. ' % (alias, aforce) 424 else: 425 raise AssertionError() 426 427 return pipeline 428