Package flumotion :: Package component :: Package producers :: Package looper :: Module looper
[hide private]

Source Code for Module flumotion.component.producers.looper.looper

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24   
 25  from flumotion.common import errors, messages 
 26  from flumotion.common.i18n import N_, gettexter 
 27  from flumotion.component import feedcomponent 
 28   
 29  __version__ = "$Rev: 6695 $" 
 30  T_ = gettexter() 
 31   
 32   
33 -class LooperMedium(feedcomponent.FeedComponentMedium):
34 - def __init__(self, comp):
36
37 - def remote_gimme5(self, text):
38 return self.comp.do_seek()
39
40 - def remote_getNbIterations(self):
41 return self.comp.nbiterations
42
44 return self.comp.fileinformation
45 46 47 # How to start the first segment: 48 # 1) Make your pipeline, but don't link the sinks 49 # 2) Block the source pads of what would be the sinks' peers 50 # 3) When both block functions fire, link the pads, then do a segment seek 51 # 4) Then you can unblock pads and the sinks will receive exactly one 52 # new segment with all gst versions 53 # 54 # To loop a segment, when you get the segment_done message 55 # asynchronously, just do a new segment seek. 56
57 -class Looper(feedcomponent.ParseLaunchComponent):
58 59 componentMediumClass = LooperMedium 60
61 - def init(self):
62 self.initial_seek = False 63 self.nbiterations = 0 64 self.fileinformation = None 65 self.timeoutid = 0 66 self.pads_awaiting_block = [] 67 self.pads_to_link = [] 68 self.bus = None 69 self.uiState.addKey('info-location', '') 70 self.uiState.addKey('info-duration', 0) 71 self.uiState.addKey('info-audio', None) 72 self.uiState.addKey('info-video', None) 73 self.uiState.addKey('num-iterations', 0) 74 self.uiState.addKey('position', 0)
75
76 - def do_check(self):
77 def on_result(result): 78 for m in result.messages: 79 self.addMessage(m)
80 81 from flumotion.component.producers import checks 82 d = checks.checkTicket349() 83 d.addCallback(on_result) 84 return d
85
86 - def get_pipeline_string(self, properties):
87 # setup the properties 88 self.bus = None 89 self.videowidth = properties.get('width', 240) 90 self.videoheight = properties.get('height', int(576 * self.videowidth/720.)) 91 self.videoframerate = properties.get('framerate', (25, 2)) 92 self.filelocation = properties.get('location') 93 94 vstruct = gst.structure_from_string("video/x-raw-yuv,width=%(width)d,height=%(height)d" % 95 dict (width=self.videowidth, height=self.videoheight)) 96 vstruct['framerate'] = gst.Fraction(self.videoframerate[0], 97 self.videoframerate[1]) 98 99 vcaps = gst.Caps(vstruct) 100 101 self.run_discoverer() 102 103 template = ( 104 'filesrc location=%(location)s' 105 ' ! oggdemux name=demux' 106 ' demux. ! queue ! theoradec name=theoradec' 107 ' ! identity name=videolive single-segment=true silent=true' 108 ' ! videorate name=videorate' 109 ' ! videoscale' 110 ' ! %(vcaps)s' 111 ' ! identity name=vident sync=true silent=true ! @feeder:video@' 112 ' demux. ! queue ! vorbisdec name=vorbisdec' 113 ' ! identity name=audiolive single-segment=true silent=true' 114 ' ! audioconvert' 115 ' ! audio/x-raw-int,width=16,depth=16,signed=(boolean)true' 116 ' ! identity name=aident sync=true silent=true ! @feeder:audio@' 117 % dict(location=self.filelocation, vcaps=vcaps)) 118 119 return template
120
121 - def make_message_for_gstreamer_error(self, gerror, debug):
122 if gerror.domain == 'gst-resource-error-quark': 123 return messages.Error(T_(N_( 124 "Could not open file '%s' for reading."), self.filelocation), 125 debug='%s\n%s' % (gerror.message, debug), 126 mid=gerror.domain, priority=40) 127 base = feedcomponent.ParseLaunchComponent 128 return base.make_message_for_gstreamer_error(gerror, debug)
129
130 - def run_discoverer(self):
131 def discovered(d, ismedia): 132 self.uiState.set('info-location', self.filelocation) 133 self.uiState.set('info-duration', 134 max(d.audiolength, d.videolength)) 135 if d.is_audio: 136 self.uiState.set('info-audio', 137 "%d channel(s) %dHz" % (d.audiochannels, 138 d.audiorate)) 139 if d.is_video: 140 self.uiState.set('info-video', 141 "%d x %d at %d/%d fps" % (d.videowidth, 142 d.videoheight, 143 d.videorate.num, 144 d.videorate.denom))
145 146 from gst.extend import discoverer 147 d = discoverer.Discoverer(self.filelocation) 148 d.connect('discovered', discovered) 149 d.discover() 150
151 - def on_segment_done(self):
152 self.do_seek(False) 153 self.nbiterations += 1 154 self.uiState.set('num-iterations', self.nbiterations)
155
156 - def on_pads_blocked(self):
157 for src, sink in self.pads_to_link: 158 src.link(sink) 159 self.do_seek(True) 160 for src, sink in self.pads_to_link: 161 src.set_blocked_async(False, lambda *x: None) 162 self.pads_to_link = [] 163 self.nbiterations = 0 164 self.uiState.set('num-iterations', self.nbiterations)
165
166 - def configure_pipeline(self, pipeline, properties):
167 def on_message(bus, message): 168 handlers = {(pipeline, gst.MESSAGE_SEGMENT_DONE): 169 self.on_segment_done, 170 (pipeline, gst.MESSAGE_APPLICATION): 171 self.on_pads_blocked} 172 173 if (message.src, message.type) in handlers: 174 handlers[(message.src, message.type)]()
175 176 self.oggdemux = pipeline.get_by_name("demux") 177 178 for name in 'aident', 'vident': 179 def blocked(x, is_blocked): 180 if not x in self.pads_awaiting_block: 181 return 182 self.pads_awaiting_block.remove(x) 183 if not self.pads_awaiting_block: 184 s = gst.Structure('pads-blocked') 185 m = gst.message_new_application(pipeline, s) 186 # marshal to the main thread 187 pipeline.post_message(m) 188 189 e = pipeline.get_by_name(name) 190 src = e.get_pad('src') 191 sink = src.get_peer() 192 src.unlink(sink) 193 src.set_blocked_async(True, blocked) 194 self.pads_awaiting_block.append(src) 195 self.pads_to_link.append((src, sink)) 196 197 self.bus = pipeline.get_bus() 198 self.bus.add_signal_watch() 199 200 self.bus.connect('message', on_message) 201
202 - def do_seek(self, flushing):
203 """ 204 Restarts the looping. 205 206 Returns True if the seeking was accepted, 207 Returns False otherwiser 208 """ 209 self.debug("restarting looping") 210 flags = gst.SEEK_FLAG_SEGMENT | (flushing and gst.SEEK_FLAG_FLUSH or 0) 211 return self.oggdemux.seek(1.0, gst.FORMAT_TIME, flags, 212 gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_END, 0)
213
214 - def do_setup(self):
215 def check_time(): 216 self.log("checking position") 217 try: 218 pos, format = self.pipeline.query_position(gst.FORMAT_TIME) 219 except: 220 self.debug("position query didn't succeed") 221 else: 222 self.uiState.set('position', pos) 223 return True
224 225 if not self.timeoutid: 226 self.timeoutid = gobject.timeout_add(500, check_time) 227
228 - def do_stop(self):
229 if self.bus: 230 self.bus.remove_signal_watch() 231 self.bus = None 232 233 if self.timeoutid: 234 gobject.source_remove(self.timeoutid) 235 self.timeoutid = 0 236 237 self.nbiterations = 0
238