1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23
24 import gobject
25 import gst
26 import gst.interfaces
27 from twisted.internet.threads import deferToThread
28 from twisted.internet import defer
29
30 from flumotion.common import gstreamer, errors, log, messages
31 from flumotion.common.i18n import N_, gettexter
32 from flumotion.twisted import defer as fdefer
33 from flumotion.worker.checks import check
34
35 __version__ = "$Rev: 7094 $"
36 T_ = gettexter()
37
38
51
52 -def do_element_check(pipeline_str, element_name, check_proc, state=None,
53 set_state_deferred=False):
54 """
55 Parse the given pipeline and set it to the given state.
56 When the bin reaches that state, perform the given check function on the
57 element with the given name.
58
59 @param pipeline_str: description of the pipeline used to test
60 @param element_name: name of the element being checked
61 @param check_proc: a function to call with the GstElement as argument.
62 @param state: an unused keyword parameter that will be removed when
63 support for GStreamer 0.8 is dropped.
64 @param set_state_deferred: a flag to say whether the set_state is run in
65 a deferToThread
66 @type set_state_deferred: bool
67 @returns: a deferred that will fire with the result of check_proc, or
68 fail.
69 @rtype: L{twisted.internet.defer.Deferred}
70 """
71 def run_check(pipeline, resolution):
72 element = pipeline.get_by_name(element_name)
73 try:
74 retval = check_proc(element)
75 resolution.callback(retval)
76 except check.CheckProcError, e:
77 log.debug('check', 'CheckProcError when running %r: %r',
78 check_proc, e.data)
79 resolution.errback(errors.RemoteRunError(e.data))
80 except Exception, e:
81 log.debug('check', 'Unhandled exception while running %r: %r',
82 check_proc, e)
83 resolution.errback(errors.RemoteRunError(
84 log.getExceptionMessage(e)))
85
86
87 pipeline.set_state(gst.STATE_NULL)
88
89
90 def message_rcvd(bus, message, pipeline, resolution):
91 t = message.type
92 if t == gst.MESSAGE_STATE_CHANGED:
93 if message.src == pipeline:
94 old, new, pending = message.parse_state_changed()
95 if new == gst.STATE_PLAYING:
96 run_check(pipeline, resolution)
97 elif t == gst.MESSAGE_ERROR:
98 gerror, debug = message.parse_error()
99
100
101 pipeline.set_state(gst.STATE_NULL)
102 resolution.errback(errors.GStreamerGstError(message.src, gerror, debug))
103 elif t == gst.MESSAGE_EOS:
104 resolution.errback(errors.GStreamerError("Unexpected end of stream"))
105 else:
106 log.debug('check', 'message: %s: %s:' % (
107 message.src.get_path_string(),
108 message.type.value_nicks[1]))
109 if message.structure:
110 log.debug('check', 'message: %s' %
111 message.structure.to_string())
112 else:
113 log.debug('check', 'message: (no structure)')
114 return True
115
116 resolution = BusResolution()
117
118 log.debug('check', 'parsing pipeline %s' % pipeline_str)
119 try:
120 pipeline = gst.parse_launch(pipeline_str)
121 log.debug('check', 'parsed pipeline %s' % pipeline_str)
122 except gobject.GError, e:
123 resolution.errback(errors.GStreamerError(e.message))
124 return resolution.d
125
126 bus = pipeline.get_bus()
127 bus.add_signal_watch()
128 signal_id = bus.connect('message', message_rcvd, pipeline, resolution)
129
130 resolution.signal_id = signal_id
131 resolution.pipeline = pipeline
132 log.debug('check', 'setting state to playing')
133 if set_state_deferred:
134 d = deferToThread(pipeline.set_state, gst.STATE_PLAYING)
135 def stateChanged(res):
136 return resolution.d
137 d.addCallback(stateChanged)
138 return d
139 else:
140 pipeline.set_state(gst.STATE_PLAYING)
141 return resolution.d
142
144 """
145 Probe the firewire device.
146
147 Return a deferred firing a result.
148
149 The result is either:
150 - succesful, with a None value: no device found
151 - succesful, with a dictionary of width, height, and par as a num/den pair
152 - failed
153
154 @param mid: the id to set on the message.
155
156 @rtype: L{twisted.internet.defer.Deferred} of
157 L{flumotion.common.messages.Result}
158 """
159 result = messages.Result()
160
161 def do_check(demux):
162 pad = demux.get_pad('video')
163
164 if pad.get_negotiated_caps() == None:
165 raise errors.GStreamerError('Pipeline failed to negotiate?')
166
167 caps = pad.get_negotiated_caps()
168 s = caps.get_structure(0)
169 w = s['width']
170 h = s['height']
171 par = s['pixel-aspect-ratio']
172 result = dict(width=w, height=h, par=(par.num, par.denom))
173 log.debug('check', 'returning dict %r' % result)
174 return result
175
176
177 if not os.path.exists('/dev/raw1394'):
178 m = messages.Error(T_(N_("Device node /dev/raw1394 does not exist.")),
179 id=mid)
180 result.add(m)
181 return defer.succeed(result)
182
183 pipeline = 'dv1394src name=source ! dvdemux name=demux ! fakesink'
184 d = do_element_check(pipeline, 'demux', do_check)
185
186 def errbackResult(failure):
187 log.debug('check', 'returning failed Result, %r' % failure)
188 m = None
189 if failure.check(errors.GStreamerGstError):
190 source, gerror, debug = failure.value.args
191 log.debug('check', 'GStreamer GError: %s (debug: %s)' % (
192 gerror.message, debug))
193 if gerror.domain == "gst-resource-error-quark":
194 if gerror.code == int(gst.RESOURCE_ERROR_NOT_FOUND):
195
196
197 version = gstreamer.get_plugin_version('1394')
198 if version >= (0,10,0,0) and version <= (0,10,2,0):
199 m = messages.Error(T_(
200 N_("Could not find or open the Firewire device. "
201 "Check the device node and its permissions.")))
202 else:
203 m = messages.Error(T_(
204 N_("No Firewire device found.")))
205 elif gerror.code == int(gst.RESOURCE_ERROR_OPEN_READ):
206 m = messages.Error(T_(
207 N_("Could not open Firewire device for reading. "
208 "Check permissions on the device.")))
209
210 if not m:
211 m = check.handleGStreamerDeviceError(failure, 'Firewire',
212 mid=mid)
213
214 if not m:
215 m = messages.Error(T_(N_("Could not probe Firewire device.")),
216 debug=check.debugFailure(failure))
217
218 m.id = mid
219 result.add(m)
220 return result
221 d.addCallback(check.callbackResult, result)
222 d.addErrback(errbackResult)
223
224 return d
225