Package flumotion :: Package component :: Package producers :: Package playlist :: Module playlistparser
[hide private]

Source Code for Module flumotion.component.producers.playlist.playlistparser

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  from gst.extend import discoverer 
 24   
 25  import time 
 26  import calendar 
 27  from StringIO import StringIO 
 28   
 29  from xml.dom import Node 
 30   
 31  from twisted.internet import reactor 
 32   
 33  from flumotion.common import log, fxml 
 34   
 35  __version__ = "$Rev: 6983 $" 
 36   
 37   
38 -class PlaylistItem(object, log.Loggable):
39 - def __init__(self, piid, timestamp, uri, offset, duration):
40 self.id = piid 41 self.timestamp = timestamp 42 self.uri = uri 43 self.offset = offset 44 self.duration = duration 45 46 self.hasAudio = True 47 self.hasVideo = True 48 49 self.next = None 50 self.prev = None
51
52 -class Playlist(object, log.Loggable):
53 logCategory = 'playlist-list' 54
55 - def __init__(self, producer):
56 """ 57 Create an initially empty playlist 58 """ 59 self.items = None # PlaylistItem linked list 60 self._itemsById = {} 61 62 self.producer = producer
63
64 - def _findItem(self, position):
65 # Get the item that corresponds to position, or None 66 cur = self.items 67 while cur: 68 if cur.timestamp < position and \ 69 cur.timestamp + cur.duration > position: 70 return cur 71 if cur.timestamp > position: 72 return None # fail without having to iterate over everything 73 cur = cur.next 74 return None
75
76 - def _getCurrentItem(self):
77 position = self.producer.getCurrentPosition() 78 item = self._findItem(position) 79 self.debug("Item %r found as current for playback position %d", 80 item, position) 81 return item
82
83 - def removeItems(self, piid):
84 current = self._getCurrentItem() 85 86 if piid not in self._itemsById: 87 return 88 89 items = self._itemsById[piid] 90 for item in items: 91 if (current and item.timestamp < current.timestamp + 92 current.duration): 93 self.debug("Not removing current item!") 94 continue 95 self.unlinkItem(item) 96 self.producer.unscheduleItem(item) 97 98 del self._itemsById[piid]
99
100 - def addItem(self, piid, timestamp, uri, offset, duration, 101 hasAudio, hasVideo):
102 """ 103 Add an item to the playlist. 104 105 This may remove overlapping entries, or adjust timestamps/durations of 106 entries to make the new one fit. 107 """ 108 current = self._getCurrentItem() 109 if current and timestamp < current.timestamp + current.duration: 110 self.warning("New object at uri %s starts during current object, " 111 "cannot add") 112 return None 113 # We don't care about anything older than now; drop references to them 114 if current: 115 self.items = current 116 117 newitem = PlaylistItem(piid, timestamp, uri, offset, duration) 118 newitem.hasAudio = hasAudio 119 newitem.hasVideo = hasVideo 120 121 if piid in self._itemsById: 122 self._itemsById[piid].append(newitem) 123 else: 124 self._itemsById[piid] = [newitem] 125 126 # prev starts strictly before the new item 127 # next starts after the new item, and ends after the end of the new item 128 prev = next = None 129 item = self.items 130 while item: 131 if item.timestamp < newitem.timestamp: 132 prev = item 133 else: 134 break 135 item = item.next 136 137 if prev: 138 item = prev.next 139 while item: 140 if (item.timestamp > newitem.timestamp and 141 item.timestamp + item.duration > 142 newitem.timestamp + newitem.duration): 143 next = item 144 break 145 item = item.next 146 147 if prev: 148 # Then things between prev and next (next might be None) are to be 149 # deleted. Do so. 150 cur = prev.next 151 while cur != next: 152 self._itemsById[cur.id].remove(cur) 153 if not self._itemsById[cur.id]: 154 del self._itemsById[cur.id] 155 self.producer.unscheduleItem(cur) 156 cur = cur.next 157 158 # update links. 159 if prev: 160 prev.next = newitem 161 newitem.prev = prev 162 else: 163 self.items = newitem 164 165 if next: 166 newitem.next = next 167 next.prev = newitem 168 169 # Duration adjustments -> Reflect into gnonlin timeline 170 if prev and prev.timestamp + prev.duration > newitem.timestamp: 171 self.debug("Changing duration of previous item from %d to %d", 172 prev.duration, newitem.timestamp - prev.timestamp) 173 prev.duration = newitem.timestamp - prev.timestamp 174 self.producer.adjustItemScheduling(prev) 175 176 if next and newitem.timestamp + newitem.duration > next.timestamp: 177 self.debug("Changing timestamp of next item from %d to %d to fit", 178 newitem.timestamp, newitem.timestamp + newitem.duration) 179 ts = newitem.timestamp + newitem.duration 180 duration = next.duration - (ts - next.timestamp) 181 next.duration = duration 182 next.timestamp = ts 183 self.producer.adjustItemScheduling(next) 184 185 # Then we need to actually add newitem into the gnonlin timeline 186 if not self.producer.scheduleItem(newitem): 187 self.debug("Failed to schedule item, unlinking") 188 # Failed to schedule it. 189 self.unlinkItem(newitem) 190 return None 191 192 return newitem
193
194 - def unlinkItem(self, item):
195 if item.prev: 196 item.prev.next = item.next 197 else: 198 self.items = item.next 199 200 if item.next: 201 item.next.prev = item.prev
202
203 -class PlaylistParser(object, log.Loggable):
204 logCategory = 'playlist-parse' 205
206 - def __init__(self, playlist):
207 self.playlist = playlist 208 209 self._pending_items = [] 210 self._discovering = False 211 self._discovering_blocked = 0 212 213 self._baseDirectory = None
214
215 - def setBaseDirectory(self, baseDir):
216 if not baseDir.endswith('/'): 217 baseDir = baseDir + '/' 218 self._baseDirectory = baseDir
219
220 - def blockDiscovery(self):
221 """ 222 Prevent playlist parser from running discoverer on any pending 223 playlist entries. Multiple subsequent invocations will require 224 the same corresponding number of calls to L{unblockDiscovery} 225 to resume discovery. 226 """ 227 self._discovering_blocked += 1 228 self.debug(' blocking discovery: %d' % self._discovering_blocked)
229
230 - def unblockDiscovery(self):
231 """ 232 Resume discovering of any pending playlist entries. If 233 L{blockDiscovery} was called multiple times multiple 234 invocations of unblockDiscovery will be required to unblock 235 the discoverer. 236 """ 237 if self._discovering_blocked > 0: 238 self._discovering_blocked -= 1 239 self.debug('unblocking discovery: %d' % self._discovering_blocked) 240 if self._discovering_blocked < 1: 241 self.startDiscovery()
242
243 - def startDiscovery(self, doSort=True):
244 """ 245 Initiate discovery of any pending playlist entries. 246 247 @param doSort: should the pending entries be ordered 248 chronologically before initiating discovery 249 @type doSort: bool 250 """ 251 self.log('startDiscovery: discovering: %s, block: %d, pending: %d' % 252 (self._discovering, self._discovering_blocked, 253 len(self._pending_items))) 254 if not self._discovering and self._discovering_blocked < 1 \ 255 and self._pending_items: 256 if doSort: 257 self._sortPending() 258 self._discoverPending()
259
260 - def _sortPending(self):
261 self.debug('sort pending: %d' % len(self._pending_items)) 262 if not self._pending_items: 263 return 264 sortlist = [(elt[1], elt) for elt in self._pending_items] 265 sortlist.sort() 266 self._pending_items = [elt for (ts, elt) in sortlist]
267
268 - def _discoverPending(self):
269 def _discovered(disc, is_media): 270 self.debug("Discovered!") 271 reactor.callFromThread(_discoverer_done, disc, is_media)
272 273 def _discoverer_done(disc, is_media): 274 if is_media: 275 self.debug("Discovery complete, media found") 276 # FIXME: does item exist because it is popped below ? 277 # if so, that's ugly and non-obvious and should be fixed 278 uri = "file://" + item[0] 279 timestamp = item[1] 280 duration = item[2] 281 offset = item[3] 282 piid = item[4] 283 284 hasA = disc.is_audio 285 hasV = disc.is_video 286 durationDiscovered = min(disc.audiolength, 287 disc.videolength) 288 if not duration or duration > durationDiscovered: 289 duration = durationDiscovered 290 291 if duration + offset > durationDiscovered: 292 offset = 0 293 294 if duration > 0: 295 self.playlist.addItem(piid, timestamp, uri, 296 offset, duration, hasA, hasV) 297 else: 298 self.warning("Duration of item is zero, not adding") 299 else: 300 self.warning("Discover failed to find media in %s", item[0]) 301 302 # We don't want to burn too much cpu discovering all the files; 303 # this throttles the discovery rate to a reasonable level 304 self.debug("Continuing on to next file in one second") 305 reactor.callLater(1, self._discoverPending)
306 307 if not self._pending_items: 308 self.debug("No more files to discover") 309 self._discovering = False 310 return 311 312 if self._discovering_blocked > 0: 313 self.debug("Discovering blocked: %d" % self._discovering_blocked) 314 self._discovering = False 315 return 316 317 self._discovering = True 318 319 item = self._pending_items.pop(0) 320 321 self.debug("Discovering file %s", item[0]) 322 disc = discoverer.Discoverer(item[0]) 323 324 disc.connect('discovered', _discovered) 325 disc.discover() 326
327 - def addItemToPlaylist(self, filename, timestamp, duration, offset, piid):
328 # We only want to add it if it's plausibly schedulable. 329 end = timestamp 330 if duration is not None: 331 end += duration 332 if end < time.time() * gst.SECOND: 333 self.debug("Early-out: ignoring add for item in past") 334 return 335 336 if filename[0] != '/' and self._baseDirectory: 337 filename = self._baseDirectory + filename 338 339 self._pending_items.append((filename, timestamp, duration, offset, 340 piid)) 341 342 # Now launch the discoverer for any pending items 343 self.startDiscovery()
344
345 -class PlaylistXMLParser(PlaylistParser):
346 logCategory = 'playlist-xml' 347
348 - def parseData(self, data):
349 """ 350 Parse playlist XML document data 351 """ 352 fileHandle = StringIO(data) 353 self.parseFile(fileHandle)
354
355 - def replaceFile(self, file, piid):
356 self.playlist.removeItems(piid) 357 self.parseFile(file, piid)
358
359 - def parseFile(self, file, piid=None):
360 """ 361 Parse a playlist file. Adds the contents of the file to the existing 362 playlist, overwriting any existing entries for the same time period. 363 """ 364 parser = fxml.Parser() 365 366 root = parser.getRoot(file) 367 368 node = root.documentElement 369 self.debug("Parsing playlist from file %s", file) 370 if node.nodeName != 'playlist': 371 raise fxml.ParserError("Root node is not 'playlist'") 372 373 self.blockDiscovery() 374 try: 375 for child in node.childNodes: 376 if child.nodeType == Node.ELEMENT_NODE and \ 377 child.nodeName == 'entry': 378 self.debug("Parsing entry") 379 self._parsePlaylistEntry(parser, child, piid) 380 finally: 381 self.unblockDiscovery()
382 383 # A simplified private version of this code from fxml without the 384 # undesirable unicode->str conversions.
385 - def _parseAttributes(self, node, required, optional):
386 out = [] 387 for k in required: 388 if node.hasAttribute(k): 389 out.append(node.getAttribute(k)) 390 else: 391 raise fxml.ParserError("Missing required attribute %s" % k) 392 393 for k in optional: 394 if node.hasAttribute(k): 395 out.append(node.getAttribute(k)) 396 else: 397 out.append(None) 398 return out
399
400 - def _parsePlaylistEntry(self, parser, entry, piid):
401 mandatory = ['filename', 'time'] 402 optional = ['duration', 'offset'] 403 404 (filename, timestamp, duration, offset) = self._parseAttributes( 405 entry, mandatory, optional) 406 407 if duration is not None: 408 duration = int(float(duration) * gst.SECOND) 409 if offset is None: 410 offset = 0 411 offset = int(offset) * gst.SECOND 412 413 timestamp = self._parseTimestamp(timestamp) 414 415 # Assume UTF-8 filesystem. 416 filename = filename.encode("UTF-8") 417 418 self.addItemToPlaylist(filename, timestamp, duration, offset, piid)
419
420 - def _parseTimestamp(self, ts):
421 # Take TS in YYYY-MM-DDThh:mm:ss.ssZ format, return timestamp in 422 # nanoseconds since the epoch 423 424 # time.strptime() doesn't handle the fractional seconds part. We ignore 425 # it entirely, after verifying that it has the right format. 426 tsmain, trailing = ts[:-4], ts[-4:] 427 if trailing[0] != '.' or trailing[3] != 'Z' or \ 428 not trailing[1].isdigit() or not trailing[2].isdigit(): 429 raise fxml.ParserError("Invalid timestamp %s" % ts) 430 format = "%Y-%m-%dT%H:%M:%S" 431 432 try: 433 timestruct = time.strptime(tsmain, format) 434 return int(calendar.timegm(timestruct) * gst.SECOND) 435 except ValueError: 436 raise fxml.ParserError("Invalid timestamp %s" % ts)
437