Package flumotion :: Package component :: Package producers :: Package looper :: Module looper
[hide private]

Source Code for Module flumotion.component.producers.looper.looper

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24   
 25  from flumotion.common import errors, messages 
 26  from flumotion.component import feedcomponent 
 27   
 28  from flumotion.common.messages import N_ 
 29  T_ = messages.gettexter('flumotion') 
 30   
31 -class LooperMedium(feedcomponent.FeedComponentMedium):
32 - def __init__(self, comp):
34
35 - def remote_gimme5(self, text):
36 return self.comp.do_seek()
37
38 - def remote_getNbIterations(self):
39 return self.comp.nbiterations
40
42 return self.comp.fileinformation
43 44 45 # How to start the first segment: 46 # 1) Make your pipeline, but don't link the sinks 47 # 2) Block the source pads of what would be the sinks' peers 48 # 3) When both block functions fire, link the pads, then do a segment seek 49 # 4) Then you can unblock pads and the sinks will receive exactly one 50 # new segment with all gst versions 51 # 52 # To loop a segment, when you get the segment_done message 53 # asynchronously, just do a new segment seek. 54
55 -class Looper(feedcomponent.ParseLaunchComponent):
56 57 componentMediumClass = LooperMedium 58
59 - def init(self):
60 self.initial_seek = False 61 self.nbiterations = 0 62 self.fileinformation = None 63 self.timeoutid = 0 64 self.pads_awaiting_block = [] 65 self.pads_to_link = [] 66 self.bus = None 67 self.uiState.addKey('info-location', '') 68 self.uiState.addKey('info-duration', 0) 69 self.uiState.addKey('info-audio', None) 70 self.uiState.addKey('info-video', None) 71 self.uiState.addKey('num-iterations', 0) 72 self.uiState.addKey('position', 0)
73
74 - def do_check(self):
75 def on_result(result): 76 for m in result.messages: 77 self.addMessage(m)
78 79 from flumotion.component.producers import checks 80 d = checks.checkTicket349() 81 d.addCallback(on_result) 82 return d
83
84 - def get_pipeline_string(self, properties):
85 # setup the properties 86 self.bus = None 87 self.videowidth = properties.get('width', 240) 88 self.videoheight = properties.get('height', int(576 * self.videowidth/720.)) 89 self.videoframerate = properties.get('framerate', (25, 2)) 90 self.filelocation = properties.get('location') 91 92 vstruct = gst.structure_from_string("video/x-raw-yuv,width=%(width)d,height=%(height)d" % 93 dict (width=self.videowidth, height=self.videoheight)) 94 vstruct['framerate'] = gst.Fraction(self.videoframerate[0], 95 self.videoframerate[1]) 96 97 vcaps = gst.Caps(vstruct) 98 99 self.run_discoverer() 100 101 template = ( 102 'filesrc location=%(location)s' 103 ' ! oggdemux name=demux' 104 ' demux. ! queue ! theoradec name=theoradec' 105 ' ! identity name=videolive single-segment=true silent=true' 106 ' ! videorate name=videorate' 107 ' ! videoscale' 108 ' ! %(vcaps)s' 109 ' ! identity name=vident sync=true silent=true ! @feeder::video@' 110 ' demux. ! queue ! vorbisdec name=vorbisdec' 111 ' ! identity name=audiolive single-segment=true silent=true' 112 ' ! audioconvert' 113 ' ! audio/x-raw-int,width=16,depth=16,signed=(boolean)true' 114 ' ! identity name=aident sync=true silent=true ! @feeder::audio@' 115 % dict(location=self.filelocation, vcaps=vcaps)) 116 117 return template
118
119 - def run_discoverer(self):
120 def discovered(d, ismedia): 121 self.uiState.set('info-location', self.filelocation) 122 self.uiState.set('info-duration', 123 max(d.audiolength, d.videolength)) 124 if d.is_audio: 125 self.uiState.set('info-audio', 126 "%d channel(s) %dHz" % (d.audiochannels, 127 d.audiorate)) 128 if d.is_video: 129 self.uiState.set('info-video', 130 "%d x %d at %d/%d fps" % (d.videowidth, 131 d.videoheight, 132 d.videorate.num, 133 d.videorate.denom))
134 135 from gst.extend import discoverer 136 d = discoverer.Discoverer(self.filelocation) 137 d.connect('discovered', discovered) 138 d.discover() 139
140 - def on_segment_done(self):
141 self.do_seek(False) 142 self.nbiterations += 1 143 self.uiState.set('num-iterations', self.nbiterations)
144
145 - def on_pads_blocked(self):
146 for src, sink in self.pads_to_link: 147 src.link(sink) 148 self.do_seek(True) 149 for src, sink in self.pads_to_link: 150 src.set_blocked_async(False, lambda *x: None) 151 self.pads_to_link = [] 152 self.nbiterations = 0 153 self.uiState.set('num-iterations', self.nbiterations)
154
155 - def configure_pipeline(self, pipeline, properties):
156 def on_message(bus, message): 157 handlers = {(pipeline, gst.MESSAGE_SEGMENT_DONE): 158 self.on_segment_done, 159 (pipeline, gst.MESSAGE_APPLICATION): 160 self.on_pads_blocked} 161 162 if (message.src, message.type) in handlers: 163 handlers[(message.src, message.type)]()
164 165 self.oggdemux = pipeline.get_by_name("demux") 166 167 for name in 'aident', 'vident': 168 def blocked(x, is_blocked): 169 if not x in self.pads_awaiting_block: 170 return 171 self.pads_awaiting_block.remove(x) 172 if not self.pads_awaiting_block: 173 s = gst.Structure('pads-blocked') 174 m = gst.message_new_application(pipeline, s) 175 # marshal to the main thread 176 pipeline.post_message(m) 177 178 e = pipeline.get_by_name(name) 179 src = e.get_pad('src') 180 sink = src.get_peer() 181 src.unlink(sink) 182 src.set_blocked_async(True, blocked) 183 self.pads_awaiting_block.append(src) 184 self.pads_to_link.append((src, sink)) 185 186 self.bus = pipeline.get_bus() 187 self.bus.add_signal_watch() 188 189 self.bus.connect('message', on_message) 190
191 - def do_seek(self, flushing):
192 """ 193 Restarts the looping. 194 195 Returns True if the seeking was accepted, 196 Returns False otherwiser 197 """ 198 self.debug("restarting looping") 199 flags = gst.SEEK_FLAG_SEGMENT | (flushing and gst.SEEK_FLAG_FLUSH or 0) 200 return self.oggdemux.seek(1.0, gst.FORMAT_TIME, flags, 201 gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_END, 0)
202
203 - def do_start(self, *args, **kwargs):
204 def check_time(): 205 self.log("checking position") 206 try: 207 pos, format = self.pipeline.query_position(gst.FORMAT_TIME) 208 except: 209 self.debug("position query didn't succeed") 210 else: 211 self.uiState.set('position', pos) 212 return True
213 214 def on_start(result): 215 if not self.timeoutid: 216 self.timeoutid = gobject.timeout_add(500, check_time) 217 return result 218 219 d = feedcomponent.ParseLaunchComponent.do_start(self, *args, **kwargs) 220 d.addCallback(on_start) 221 222 return d 223
224 - def do_stop(self):
225 if self.bus: 226 self.bus.remove_signal_watch() 227 self.bus = None 228 229 if self.timeoutid: 230 gobject.source_remove(self.timeoutid) 231 self.timeoutid = 0 232 233 self.nbiterations = 0 234 235 return feedcomponent.ParseLaunchComponent.do_stop(self)
236