Package flumotion :: Package worker :: Package checks :: Module video010
[hide private]

Source Code for Module flumotion.worker.checks.video010

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23   
 24  import gobject 
 25  import gst 
 26  import gst.interfaces 
 27  from twisted.internet.threads import deferToThread 
 28   
 29  from twisted.internet import defer 
 30  from flumotion.common import gstreamer, errors, log, messages 
 31  from flumotion.twisted import defer as fdefer 
 32   
 33  from flumotion.worker.checks import check 
 34   
 35  from flumotion.common.messages import N_ 
 36  T_ = messages.gettexter('flumotion') 
 37       
38 -class BusResolution(fdefer.Resolution):
39 pipeline = None 40 watch_id = None 41
42 - def cleanup(self):
43 if self.pipeline: 44 if self.watch_id: 45 self.pipeline.get_bus().remove_signal_watch() 46 self.pipeline.get_bus().disconnect(self.watch_id) 47 self.watch_id = None 48 self.pipeline.set_state(gst.STATE_NULL) 49 self.pipeline = None
50
51 -def do_element_check(pipeline_str, element_name, check_proc, state=None, 52 set_state_deferred=False):
53 """ 54 Parse the given pipeline and set it to the given state. 55 When the bin reaches that state, perform the given check function on the 56 element with the given name. 57 58 @param pipeline_str: description of the pipeline used to test 59 @param element_name: name of the element being checked 60 @param check_proc: a function to call with the GstElement as argument. 61 @param state: an unused keyword parameter that will be removed when 62 support for GStreamer 0.8 is dropped. 63 @param set_state_deferred: a flag to say whether the set_state is run in 64 a deferToThread 65 @type set_state_deferred: bool 66 @returns: a deferred that will fire with the result of check_proc, or 67 fail. 68 @rtype: L{twisted.internet.defer.Deferred} 69 """ 70 def run_check(pipeline, resolution): 71 element = pipeline.get_by_name(element_name) 72 try: 73 retval = check_proc(element) 74 resolution.callback(retval) 75 except check.CheckProcError, e: 76 log.debug('check', 'CheckProcError when running %r: %r', 77 check_proc, e.data) 78 resolution.errback(errors.RemoteRunError(e.data)) 79 except Exception, e: 80 log.debug('check', 'Unhandled exception while running %r: %r', 81 check_proc, e) 82 resolution.errback(errors.RemoteRunError( 83 log.getExceptionMessage(e))) 84 # set pipeline state to NULL so worker does not consume 85 # unnecessary resources 86 pipeline.set_state(gst.STATE_NULL)
87 88 89 def message_rcvd(bus, message, pipeline, resolution): 90 t = message.type 91 if t == gst.MESSAGE_STATE_CHANGED: 92 if message.src == pipeline: 93 old, new, pending = message.parse_state_changed() 94 if new == gst.STATE_PLAYING: 95 run_check(pipeline, resolution) 96 elif t == gst.MESSAGE_ERROR: 97 gerror, debug = message.parse_error() 98 # set pipeline state to NULL so worker does not consume 99 # unnecessary resources 100 pipeline.set_state(gst.STATE_NULL) 101 resolution.errback(errors.GStreamerGstError(message.src, gerror, debug)) 102 elif t == gst.MESSAGE_EOS: 103 resolution.errback(errors.GStreamerError("Unexpected end of stream")) 104 else: 105 log.debug('check', 'message: %s: %s:' % ( 106 message.src.get_path_string(), 107 message.type.value_nicks[1])) 108 if message.structure: 109 log.debug('check', 'message: %s' % 110 message.structure.to_string()) 111 else: 112 log.debug('check', 'message: (no structure)') 113 return True 114 115 resolution = BusResolution() 116 117 log.debug('check', 'parsing pipeline %s' % pipeline_str) 118 try: 119 pipeline = gst.parse_launch(pipeline_str) 120 log.debug('check', 'parsed pipeline %s' % pipeline_str) 121 except gobject.GError, e: 122 resolution.errback(errors.GStreamerError(e.message)) 123 return resolution.d 124 125 bus = pipeline.get_bus() 126 bus.add_signal_watch() 127 watch_id = bus.connect('message', message_rcvd, pipeline, resolution) 128 129 resolution.watch_id = watch_id 130 resolution.pipeline = pipeline 131 log.debug('check', 'setting state to playing') 132 if set_state_deferred: 133 d = deferToThread(pipeline.set_state, gst.STATE_PLAYING) 134 def stateChanged(res): 135 return resolution.d 136 d.addCallback(stateChanged) 137 return d 138 else: 139 pipeline.set_state(gst.STATE_PLAYING) 140 return resolution.d 141
142 -def check1394(id):
143 """ 144 Probe the firewire device. 145 146 Return a deferred firing a result. 147 148 The result is either: 149 - succesful, with a None value: no device found 150 - succesful, with a dictionary of width, height, and par as a num/den pair 151 - failed 152 153 @rtype: L{twisted.internet.defer.Deferred} of 154 L{flumotion.common.messages.Result} 155 """ 156 result = messages.Result() 157 158 def do_check(demux): 159 pad = demux.get_pad('video') 160 161 if pad.get_negotiated_caps() == None: 162 raise errors.GStreamerError('Pipeline failed to negotiate?') 163 164 caps = pad.get_negotiated_caps() 165 s = caps.get_structure(0) 166 w = s['width'] 167 h = s['height'] 168 par = s['pixel-aspect-ratio'] 169 result = dict(width=w, height=h, par=(par.num, par.denom)) 170 log.debug('check', 'returning dict %r' % result) 171 return result
172 173 # first check if the obvious device node exists 174 if not os.path.exists('/dev/raw1394'): 175 m = messages.Error(T_(N_("Device node /dev/raw1394 does not exist.")), 176 id=id) 177 result.add(m) 178 return defer.succeed(result) 179 180 pipeline = 'dv1394src name=source ! dvdemux name=demux ! fakesink' 181 d = do_element_check(pipeline, 'demux', do_check) 182 183 def errbackResult(failure): 184 log.debug('check', 'returning failed Result, %r' % failure) 185 m = None 186 if failure.check(errors.GStreamerGstError): 187 source, gerror, debug = failure.value.args 188 log.debug('check', 'GStreamer GError: %s (debug: %s)' % ( 189 gerror.message, debug)) 190 if gerror.domain == "gst-resource-error-quark": 191 if gerror.code == int(gst.RESOURCE_ERROR_NOT_FOUND): 192 # dv1394src was fixed after gst-plugins-good 0.10.2 193 # to distinguish NOT_FOUND and OPEN_READ 194 version = gstreamer.get_plugin_version('1394') 195 if version >= (0,10,0,0) and version <= (0,10,2,0): 196 m = messages.Error(T_( 197 N_("Could not find or open the Firewire device. " 198 "Check the device node and its permissions."))) 199 else: 200 m = messages.Error(T_( 201 N_("No Firewire device found."))) 202 elif gerror.code == int(gst.RESOURCE_ERROR_OPEN_READ): 203 m = messages.Error(T_( 204 N_("Could not open Firewire device for reading. " 205 "Check permissions on the device."))) 206 207 if not m: 208 m = check.handleGStreamerDeviceError(failure, 'Firewire') 209 210 if not m: 211 m = messages.Error(T_(N_("Could not probe Firewire device.")), 212 debug=check.debugFailure(failure)) 213 214 m.id = id 215 result.add(m) 216 return result 217 d.addCallback(check.callbackResult, result) 218 d.addErrback(errbackResult) 219 220 return d 221