Package flumotion :: Package worker :: Package checks :: Module gst010
[hide private]

Source Code for Module flumotion.worker.checks.gst010

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23   
 24  import gobject 
 25  import gst 
 26  import gst.interfaces 
 27  from twisted.internet.threads import deferToThread 
 28  from twisted.internet import defer 
 29   
 30  from flumotion.common import gstreamer, errors, log, messages 
 31  from flumotion.common.i18n import N_, gettexter 
 32  from flumotion.twisted import defer as fdefer 
 33  from flumotion.worker.checks import check 
 34   
 35  __version__ = "$Rev: 7094 $" 
 36  T_ = gettexter() 
 37   
 38   
39 -class BusResolution(fdefer.Resolution):
40 pipeline = None 41 signal_id = None 42
43 - def cleanup(self):
44 if self.pipeline: 45 if self.signal_id: 46 self.pipeline.get_bus().remove_signal_watch() 47 self.pipeline.get_bus().disconnect(self.signal_id) 48 self.signal_id = None 49 self.pipeline.set_state(gst.STATE_NULL) 50 self.pipeline = None
51
52 -def do_element_check(pipeline_str, element_name, check_proc, state=None, 53 set_state_deferred=False):
54 """ 55 Parse the given pipeline and set it to the given state. 56 When the bin reaches that state, perform the given check function on the 57 element with the given name. 58 59 @param pipeline_str: description of the pipeline used to test 60 @param element_name: name of the element being checked 61 @param check_proc: a function to call with the GstElement as argument. 62 @param state: an unused keyword parameter that will be removed when 63 support for GStreamer 0.8 is dropped. 64 @param set_state_deferred: a flag to say whether the set_state is run in 65 a deferToThread 66 @type set_state_deferred: bool 67 @returns: a deferred that will fire with the result of check_proc, or 68 fail. 69 @rtype: L{twisted.internet.defer.Deferred} 70 """ 71 def run_check(pipeline, resolution): 72 element = pipeline.get_by_name(element_name) 73 try: 74 retval = check_proc(element) 75 resolution.callback(retval) 76 except check.CheckProcError, e: 77 log.debug('check', 'CheckProcError when running %r: %r', 78 check_proc, e.data) 79 resolution.errback(errors.RemoteRunError(e.data)) 80 except Exception, e: 81 log.debug('check', 'Unhandled exception while running %r: %r', 82 check_proc, e) 83 resolution.errback(errors.RemoteRunError( 84 log.getExceptionMessage(e))) 85 # set pipeline state to NULL so worker does not consume 86 # unnecessary resources 87 pipeline.set_state(gst.STATE_NULL)
88 89 90 def message_rcvd(bus, message, pipeline, resolution): 91 t = message.type 92 if t == gst.MESSAGE_STATE_CHANGED: 93 if message.src == pipeline: 94 old, new, pending = message.parse_state_changed() 95 if new == gst.STATE_PLAYING: 96 run_check(pipeline, resolution) 97 elif t == gst.MESSAGE_ERROR: 98 gerror, debug = message.parse_error() 99 # set pipeline state to NULL so worker does not consume 100 # unnecessary resources 101 pipeline.set_state(gst.STATE_NULL) 102 resolution.errback(errors.GStreamerGstError(message.src, gerror, debug)) 103 elif t == gst.MESSAGE_EOS: 104 resolution.errback(errors.GStreamerError("Unexpected end of stream")) 105 else: 106 log.debug('check', 'message: %s: %s:' % ( 107 message.src.get_path_string(), 108 message.type.value_nicks[1])) 109 if message.structure: 110 log.debug('check', 'message: %s' % 111 message.structure.to_string()) 112 else: 113 log.debug('check', 'message: (no structure)') 114 return True 115 116 resolution = BusResolution() 117 118 log.debug('check', 'parsing pipeline %s' % pipeline_str) 119 try: 120 pipeline = gst.parse_launch(pipeline_str) 121 log.debug('check', 'parsed pipeline %s' % pipeline_str) 122 except gobject.GError, e: 123 resolution.errback(errors.GStreamerError(e.message)) 124 return resolution.d 125 126 bus = pipeline.get_bus() 127 bus.add_signal_watch() 128 signal_id = bus.connect('message', message_rcvd, pipeline, resolution) 129 130 resolution.signal_id = signal_id 131 resolution.pipeline = pipeline 132 log.debug('check', 'setting state to playing') 133 if set_state_deferred: 134 d = deferToThread(pipeline.set_state, gst.STATE_PLAYING) 135 def stateChanged(res): 136 return resolution.d 137 d.addCallback(stateChanged) 138 return d 139 else: 140 pipeline.set_state(gst.STATE_PLAYING) 141 return resolution.d 142
143 -def check1394(mid):
144 """ 145 Probe the firewire device. 146 147 Return a deferred firing a result. 148 149 The result is either: 150 - succesful, with a None value: no device found 151 - succesful, with a dictionary of width, height, and par as a num/den pair 152 - failed 153 154 @param mid: the id to set on the message. 155 156 @rtype: L{twisted.internet.defer.Deferred} of 157 L{flumotion.common.messages.Result} 158 """ 159 result = messages.Result() 160 161 def do_check(demux): 162 pad = demux.get_pad('video') 163 164 if pad.get_negotiated_caps() == None: 165 raise errors.GStreamerError('Pipeline failed to negotiate?') 166 167 caps = pad.get_negotiated_caps() 168 s = caps.get_structure(0) 169 w = s['width'] 170 h = s['height'] 171 par = s['pixel-aspect-ratio'] 172 result = dict(width=w, height=h, par=(par.num, par.denom)) 173 log.debug('check', 'returning dict %r' % result) 174 return result
175 176 # first check if the obvious device node exists 177 if not os.path.exists('/dev/raw1394'): 178 m = messages.Error(T_(N_("Device node /dev/raw1394 does not exist.")), 179 id=mid) 180 result.add(m) 181 return defer.succeed(result) 182 183 pipeline = 'dv1394src name=source ! dvdemux name=demux ! fakesink' 184 d = do_element_check(pipeline, 'demux', do_check) 185 186 def errbackResult(failure): 187 log.debug('check', 'returning failed Result, %r' % failure) 188 m = None 189 if failure.check(errors.GStreamerGstError): 190 source, gerror, debug = failure.value.args 191 log.debug('check', 'GStreamer GError: %s (debug: %s)' % ( 192 gerror.message, debug)) 193 if gerror.domain == "gst-resource-error-quark": 194 if gerror.code == int(gst.RESOURCE_ERROR_NOT_FOUND): 195 # dv1394src was fixed after gst-plugins-good 0.10.2 196 # to distinguish NOT_FOUND and OPEN_READ 197 version = gstreamer.get_plugin_version('1394') 198 if version >= (0,10,0,0) and version <= (0,10,2,0): 199 m = messages.Error(T_( 200 N_("Could not find or open the Firewire device. " 201 "Check the device node and its permissions."))) 202 else: 203 m = messages.Error(T_( 204 N_("No Firewire device found."))) 205 elif gerror.code == int(gst.RESOURCE_ERROR_OPEN_READ): 206 m = messages.Error(T_( 207 N_("Could not open Firewire device for reading. " 208 "Check permissions on the device."))) 209 210 if not m: 211 m = check.handleGStreamerDeviceError(failure, 'Firewire', 212 mid=mid) 213 214 if not m: 215 m = messages.Error(T_(N_("Could not probe Firewire device.")), 216 debug=check.debugFailure(failure)) 217 218 m.id = mid 219 result.add(m) 220 return result 221 d.addCallback(check.callbackResult, result) 222 d.addErrback(errbackResult) 223 224 return d 225