Package flumotion :: Package component :: Package producers :: Package playlist :: Module playlist
[hide private]

Source Code for Module flumotion.component.producers.playlist.playlist

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  import gst 
 25  import gobject 
 26  from twisted.internet import defer, reactor 
 27   
 28  from flumotion.common import errors, messages, log, fxml 
 29  from flumotion.common.i18n import N_, gettexter 
 30  from flumotion.component import feedcomponent 
 31  from flumotion.component.base import watcher 
 32   
 33  import smartscale 
 34  import singledecodebin 
 35  import playlistparser 
 36   
 37  __version__ = "$Rev: 6695 $" 
 38  T_ = gettexter() 
 39   
 40   
41 -def _tsToString(ts):
42 """ 43 Return a string in local time from a gstreamer timestamp value 44 """ 45 return time.ctime(ts/gst.SECOND)
46
47 -def videotest_gnl_src(name, start, duration, priority, pattern=None):
48 src = gst.element_factory_make('videotestsrc') 49 if pattern: 50 src.props.pattern = pattern 51 else: 52 # Set videotestsrc to all black. 53 src.props.pattern = 2 54 gnlsrc = gst.element_factory_make('gnlsource', name) 55 gnlsrc.props.start = start 56 gnlsrc.props.duration = duration 57 gnlsrc.props.media_start = 0 58 gnlsrc.props.media_duration = duration 59 gnlsrc.props.priority = priority 60 gnlsrc.add(src) 61 62 return gnlsrc
63
64 -def audiotest_gnl_src(name, start, duration, priority, wave=None):
65 src = gst.element_factory_make('audiotestsrc') 66 if wave: 67 src.props.wave = wave 68 else: 69 # Set audiotestsrc to use silence. 70 src.props.wave = 4 71 gnlsrc = gst.element_factory_make('gnlsource', name) 72 gnlsrc.props.start = start 73 gnlsrc.props.duration = duration 74 gnlsrc.props.media_start = 0 75 gnlsrc.props.media_duration = duration 76 gnlsrc.props.priority = priority 77 gnlsrc.add(src) 78 79 return gnlsrc
80
81 -def file_gnl_src(name, uri, caps, start, duration, offset, priority):
82 src = singledecodebin.SingleDecodeBin(caps, uri) 83 gnlsrc = gst.element_factory_make('gnlsource', name) 84 gnlsrc.props.start = start 85 gnlsrc.props.duration = duration 86 gnlsrc.props.media_start = offset 87 gnlsrc.props.media_duration = duration 88 gnlsrc.props.priority = priority 89 gnlsrc.add(src) 90 91 return gnlsrc
92
93 -class PlaylistProducerMedium(feedcomponent.FeedComponentMedium):
94 - def __init__(self, comp):
96
97 - def remote_add_playlist(self, data):
98 self.comp.addPlaylist(data)
99
100 -class PlaylistProducer(feedcomponent.FeedComponent):
101 logCategory = 'playlist-prod' 102 componentMediumClass = PlaylistProducerMedium 103
104 - def init(self):
105 self.basetime = -1 106 107 self._hasAudio = True 108 self._hasVideo = True 109 110 # The gnlcompositions for audio and video 111 self.videocomp = None 112 self.audiocomp = None 113 114 self.videocaps = gst.Caps("video/x-raw-yuv;video/x-raw-rgb") 115 self.audiocaps = gst.Caps("audio/x-raw-int;audio/x-raw-float") 116 117 self._vsrcs = {} # { PlaylistItem -> gnlsource } 118 self._asrcs = {} # { PlaylistItem -> gnlsource }
119
120 - def _buildAudioPipeline(self, pipeline, src):
121 audiorate = gst.element_factory_make("audiorate") 122 audioconvert = gst.element_factory_make('audioconvert') 123 audioresample = gst.element_factory_make('audioresample') 124 outcaps = gst.Caps( 125 "audio/x-raw-int,channels=%d,rate=%d,width=16,depth=16" % 126 (self._channels, self._samplerate)) 127 128 capsfilter = gst.element_factory_make("capsfilter") 129 capsfilter.props.caps = outcaps 130 131 pipeline.add(audiorate, audioconvert, audioresample, capsfilter) 132 src.link(audioconvert) 133 audioconvert.link(audioresample) 134 audioresample.link(audiorate) 135 audiorate.link(capsfilter) 136 137 return capsfilter.get_pad('src')
138
139 - def _buildVideoPipeline(self, pipeline, src):
140 outcaps = gst.Caps( 141 "video/x-raw-yuv,width=%d,height=%d,framerate=%d/%d," 142 "pixel-aspect-ratio=1/1" % 143 (self._width, self._height, self._framerate[0], 144 self._framerate[1])) 145 146 cspace = gst.element_factory_make("ffmpegcolorspace") 147 scaler = smartscale.SmartVideoScale() 148 scaler.set_caps(outcaps) 149 videorate = gst.element_factory_make("videorate") 150 capsfilter = gst.element_factory_make("capsfilter") 151 capsfilter.props.caps = outcaps 152 153 pipeline.add(cspace, scaler, videorate, capsfilter) 154 155 src.link(cspace) 156 cspace.link(scaler) 157 scaler.link(videorate) 158 videorate.link(capsfilter) 159 return capsfilter.get_pad('src')
160
161 - def _buildPipeline(self):
162 pipeline = gst.Pipeline() 163 164 for mediatype in ['audio', 'video']: 165 if (mediatype == 'audio' and not self._hasAudio) or ( 166 mediatype == 'video' and not self._hasVideo): 167 continue 168 169 # For each of audio, video, we build a pipeline that looks roughly 170 # like: 171 # 172 # gnlcomposition ! identity single-segment=true ! 173 # audio/video-elements ! identity sync=true ! sink 174 175 composition = gst.element_factory_make("gnlcomposition", 176 mediatype + "-composition") 177 178 segmentidentity = gst.element_factory_make("identity") 179 segmentidentity.set_property("single-segment", True) 180 segmentidentity.set_property("silent", True) 181 syncidentity = gst.element_factory_make("identity") 182 syncidentity.set_property("silent", True) 183 syncidentity.set_property("sync", True) 184 185 pipeline.add(composition, segmentidentity, syncidentity) 186 187 def _padAddedCb(element, pad, target): 188 self.debug("Pad added, linking") 189 pad.link(target)
190 composition.connect('pad-added', _padAddedCb, 191 segmentidentity.get_pad("sink")) 192 193 if mediatype == 'audio': 194 self.audiocomp = composition 195 srcpad = self._buildAudioPipeline(pipeline, segmentidentity) 196 else: 197 self.videocomp = composition 198 srcpad = self._buildVideoPipeline(pipeline, segmentidentity) 199 200 srcpad.link(syncidentity.get_pad('sink')) 201 202 feederchunk = self.get_feeder_template(mediatype) 203 binstr = "bin.("+feederchunk+" )" 204 self.debug("Parse for media composition is %s", binstr) 205 206 bin = gst.parse_launch(binstr) 207 pad = bin.find_unconnected_pad(gst.PAD_SINK) 208 ghostpad = gst.GhostPad(mediatype + "-feederpad", pad) 209 bin.add_pad(ghostpad) 210 211 pipeline.add(bin) 212 syncidentity.get_pad('src').link(ghostpad) 213 214 return pipeline
215
216 - def _createDefaultSources(self, properties):
217 if self._hasVideo: 218 vsrc = videotest_gnl_src("videotestdefault", 0, 2**63 - 1, 219 2**31 - 1, properties.get('video-pattern', None)) 220 self.videocomp.add(vsrc) 221 222 if self._hasAudio: 223 asrc = audiotest_gnl_src("videotestdefault", 0, 2**63 - 1, 224 2**31 - 1, properties.get('audio-wave', None)) 225 self.audiocomp.add(asrc)
226
227 - def set_master_clock(self, ip, port, base_time):
228 raise NotImplementedError("Playlist producer doesn't support slaving")
229
230 - def provide_master_clock(self, port):
231 # Most of this copied from feedcomponent010, but changed in various 232 # ways. Refactor the base class? 233 if self.medium: 234 ip = self.medium.getIP() 235 else: 236 ip = "127.0.0.1" 237 238 clock = self.pipeline.get_clock() 239 self.clock_provider = gst.NetTimeProvider(clock, None, port) 240 # small window here but that's ok 241 self.clock_provider.set_property('active', False) 242 243 self._master_clock_info = (ip, port, self.basetime) 244 245 return defer.succeed(self._master_clock_info)
246
247 - def get_master_clock(self):
248 return self._master_clock_info
249
250 - def _setupClock(self, pipeline):
251 # Configure our pipeline to use a known basetime and clock. 252 clock = gst.SystemClock() 253 # It doesn't matter too much what this basetime is, so long as we know 254 # the value. 255 self.basetime = clock.get_time() 256 257 # We force usage of the system clock. 258 pipeline.use_clock(clock) 259 # Now we disable default basetime distribution 260 pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE) 261 # And we choose our own basetime... 262 self.debug("Setting basetime of %d", self.basetime) 263 pipeline.set_base_time(self.basetime)
264
265 - def timeReport(self):
266 ts = self.pipeline.get_clock().get_time() 267 self.debug("Pipeline clock is now at %d -> %s", ts, _tsToString(ts)) 268 reactor.callLater(10, self.timeReport)
269
270 - def getCurrentPosition(self):
271 return self.pipeline.query_position(gst.FORMAT_TIME)[0]
272
273 - def scheduleItem(self, item):
274 """ 275 Schedule a given playlist item in our playback compositions. 276 """ 277 start = item.timestamp - self.basetime 278 self.debug("Starting item %s at %d seconds from start: %s", item.uri, 279 start/gst.SECOND, _tsToString(item.timestamp)) 280 281 # If we schedule things to start before the current pipeline position, 282 # gnonlin will adjust this to start now. However, it does this 283 # separately for audio and video, so we start from different points, 284 # thus we're out of sync. 285 # So, always start slightly in the future... 5 seconds seems to work 286 # fine in practice. 287 now = self.getCurrentPosition() 288 neareststarttime = now + 5 * gst.SECOND 289 290 if start < neareststarttime: 291 if start + item.duration < neareststarttime: 292 self.debug("Item too late; skipping entirely") 293 return False 294 else: 295 change = neareststarttime - start 296 self.debug("Starting item with offset %d", change) 297 item.duration -= change 298 item.offset += change 299 start = neareststarttime 300 301 end = start + item.duration 302 timeuntilend = end - now 303 # After the end time, remove this item from the composition, otherwise 304 # it will continue to use huge gobs of memory and lots of threads. 305 reactor.callLater(timeuntilend/gst.SECOND + 5, 306 self.unscheduleItem, item) 307 308 if self._hasVideo and item.hasVideo: 309 self.debug("Adding video source with start %d, duration %d, " 310 "offset %d", start, item.duration, item.offset) 311 vsrc = file_gnl_src(None, item.uri, self.videocaps, 312 start, item.duration, item.offset, 0) 313 self.videocomp.add(vsrc) 314 self._vsrcs[item] = vsrc 315 if self._hasAudio and item.hasAudio: 316 self.debug("Adding audio source with start %d, duration %d, " 317 "offset %d", start, item.duration, item.offset) 318 asrc = file_gnl_src(None, item.uri, self.audiocaps, 319 start, item.duration, item.offset, 0) 320 self.audiocomp.add(asrc) 321 self._asrcs[item] = asrc 322 self.debug("Done scheduling: start at %s, end at %s", 323 _tsToString(start + self.basetime), 324 _tsToString(start + self.basetime + item.duration)) 325 return True
326
327 - def unscheduleItem(self, item):
328 self.debug("Unscheduling item at uri %s", item.uri) 329 if self._hasVideo and item.hasVideo and item in self._vsrcs: 330 vsrc = self._vsrcs.pop(item) 331 self.videocomp.remove(vsrc) 332 vsrc.set_state(gst.STATE_NULL) 333 if self._hasAudio and item.hasAudio and item in self._asrcs: 334 asrc = self._asrcs.pop(item) 335 self.audiocomp.remove(asrc) 336 asrc.set_state(gst.STATE_NULL)
337
338 - def adjustItemScheduling(self, item):
339 if self._hasVideo and item.hasVideo: 340 vsrc = self._vsrcs[item] 341 vsrc.props.start = item.timestamp 342 vsrc.props.duration = item.duration 343 vsrc.props.media_duration = item.duration 344 if self._hasAudio and item.hasAudio: 345 asrc = self._asrcs[item] 346 asrc.props.start = item.timestamp 347 asrc.props.duration = item.duration 348 asrc.props.media_duration = item.duration
349
350 - def addPlaylist(self, data):
351 self.playlistparser.parseData(data)
352
353 - def create_pipeline(self):
354 props = self.config['properties']; 355 356 self._playlistfile = props.get('playlist', None) 357 self._playlistdirectory = props.get('playlist-directory', None) 358 self._baseDirectory = props.get('base-directory', None) 359 360 self._width = props.get('width', 320) 361 self._height = props.get('height', 240) 362 self._framerate = props.get('framerate', (15, 1)) 363 self._samplerate = props.get('samplerate', 44100) 364 self._channels = props.get('channels', 2) 365 366 self._hasAudio = props.get('audio', True) 367 self._hasVideo = props.get('video', True) 368 369 pipeline = self._buildPipeline() 370 self._setupClock(pipeline) 371 372 self._createDefaultSources(props) 373 374 return pipeline
375
376 - def _watchDirectory(self, dir):
377 self.debug("Watching directory %s", dir) 378 self._filesAdded = {} 379 380 self._directoryWatcher = watcher.DirectoryWatcher(dir) 381 self._directoryWatcher.subscribe(fileChanged=self._watchFileChanged, 382 fileDeleted=self._watchFileDeleted) 383 384 # in the start call watcher should find all the existing 385 # files, so we block discovery while the watcher starts 386 self.playlistparser.blockDiscovery() 387 try: 388 self._directoryWatcher.start() 389 finally: 390 self.playlistparser.unblockDiscovery()
391
392 - def _watchFileDeleted(self, file):
393 self.debug("File deleted: %s", file) 394 if file in self._filesAdded: 395 self.playlistparser.playlist.removeItems(file) 396 self._filesAdded.pop(file) 397 398 self._cleanMessage(file)
399
400 - def _cleanMessage(self, file):
401 # There's no message removal API! We have to do this instead. Ick? 402 msgid = ("playlist-parse-error", file) 403 for m in self.state.get('messages'): 404 if m.id == msgid: 405 self.state.remove('messages', m)
406
407 - def _watchFileChanged(self, file):
408 self.debug("File changed: %s", file) 409 if file in self._filesAdded: 410 self.debug("Removing existing items for changed playlist") 411 self.playlistparser.playlist.removeItems(file) 412 413 self._filesAdded[file] = None 414 self._cleanMessage(file) 415 try: 416 self.debug("Parsing file: %s", file) 417 self.playlistparser.parseFile(file, id=file) 418 except fxml.ParserError, e: 419 self.warning("Failed to parse playlist file: %r", e) 420 # Since this isn't done directly via the remote method, add a 421 # message so people can find out that it failed... 422 # Use a tuple including the filename to identify the warning, so we 423 # can add/remove one per file 424 msgid = ("playlist-parse-error", file) 425 self.addMessage( 426 messages.Warning(T_(N_( 427 "Failed to parse a playlist from file %s: %s" % 428 (file, e))), mid=msgid))
429
430 - def do_setup(self):
431 playlist = playlistparser.Playlist(self) 432 self.playlistparser = playlistparser.PlaylistXMLParser(playlist) 433 if self._baseDirectory: 434 self.playlistparser.setBaseDirectory(self._baseDirectory) 435 436 if self._playlistfile: 437 try: 438 self.playlistparser.parseFile(self._playlistfile) 439 except fxml.ParserError, e: 440 self.warning("Failed to parse playlist file: %r", e) 441 442 if self._playlistdirectory: 443 self._watchDirectory(self._playlistdirectory) 444 445 reactor.callLater(10, self.timeReport)
446