1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import errno
23 import os
24 import time
25 from datetime import datetime
26
27 import gobject
28 import gst
29 import time
30
31 from twisted.internet import reactor
32
33 from flumotion.component import feedcomponent
34 from flumotion.common import log, gstreamer, pygobject, messages, errors
35 from flumotion.common.format import strftime
36 from flumotion.common.i18n import N_, gettexter
37 from flumotion.common.mimetypes import mimeTypeToExtention
38 from flumotion.common.pygobject import gsignal
39
40 from flumotion.component.component import moods
41
42 __all__ = ['Disker']
43 __version__ = "$Rev: 6995 $"
44 T_ = gettexter()
45
46
47
48 """
49 Disker has a property 'ical-schedule'. This allows an ical file to be
50 specified in the config and have recordings scheduled based on events.
51 This file will be monitored for changes and events reloaded if this
52 happens.
53
54 The filename of a recording started from an ical file will be produced
55 via passing the ical event summary through strftime, so that an archive
56 can encode the date and time that it was begun.
57
58 The time that will be given to strftime will be given in the timezone of
59 the ical event. In practice this will either be UTC or the local time of
60 the machine running the disker, as the ical scheduler does not
61 understand arbitrary timezones.
62 """
63
64 try:
65
66 from icalendar import Calendar
67 from dateutil import rrule
68 HAS_ICAL = True
69 except ImportError:
70 HAS_ICAL = False
71
86
87 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
88 componentMediumClass = DiskerMedium
89 checkOffset = True
90 pipe_template = 'multifdsink sync-method=1 name=fdsink mode=1 sync=false'
91 file = None
92 directory = None
93 location = None
94 caps = None
95
97 self.uiState.addKey('filename', None)
98 self.uiState.addKey('recording', False)
99 self.uiState.addKey('can-schedule', HAS_ICAL)
100
102 directory = properties['directory']
103
104 self.directory = directory
105
106 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')])
107
108 rotateType = properties.get('rotate-type', 'none')
109
110
111 if not rotateType in ['none', 'size', 'time']:
112 m = messages.Error(T_(N_(
113 "The configuration property 'rotate-type' should be set to "
114 "'size', time', or 'none', not '%s'. "
115 "Please fix the configuration."),
116 rotateType), mid='rotate-type')
117 self.addMessage(m)
118 raise errors.ComponentSetupHandledError()
119
120
121 if rotateType in ['size', 'time']:
122 if rotateType not in properties.keys():
123 m = messages.Error(T_(N_(
124 "The configuration property '%s' should be set. "
125 "Please fix the configuration."),
126 rotateType), mid='rotate-type')
127 self.addMessage(m)
128 raise errors.ComponentSetupHandledError()
129
130
131 if rotateType == 'size':
132 self.setSizeRotate(properties['size'])
133 elif rotateType == 'time':
134 self.setTimeRotate(properties['time'])
135
136
137 return self.pipe_template
138
140 """
141 @param time: duration of file (in seconds)
142 """
143 reactor.callLater(time, self._rotateTimeCallback, time)
144
146 """
147 @param size: size of file (in bytes)
148 """
149 reactor.callLater(5, self._rotateSizeCallback, size)
150
156
166
168 if self.caps:
169 return self.caps.get_structure(0).get_name()
170
172 mime = self.get_mime()
173 if mime == 'multipart/x-mixed-replace':
174 mime += ";boundary=ThisRandomString"
175 return mime
176
178 """
179 @param filenameTemplate: strftime formatted string to decide filename
180 @param timeOrTuple: a valid time to pass to strftime, defaulting
181 to time.localtime(). A 9-tuple may be passed instead.
182 """
183 mime = self.get_mime()
184 ext = mimeTypeToExtention(mime)
185
186 self.stop_recording()
187
188 sink = self.get_element('fdsink')
189 if sink.get_state() == gst.STATE_NULL:
190 sink.set_state(gst.STATE_READY)
191
192 filename = ""
193 if not filenameTemplate:
194 filenameTemplate = self._defaultFilenameTemplate
195 filename = "%s.%s" % (strftime(filenameTemplate,
196 timeOrTuple or time.localtime()), ext)
197 self.location = os.path.join(self.directory, filename)
198 self.info("Changing filename to %s", self.location)
199 try:
200 self.file = open(self.location, 'a')
201 except IOError, e:
202 self.warning("Failed to open output file %s: %s",
203 self.location, log.getExceptionMessage(e))
204 m = messages.Error(T_(N_(
205 "Failed to open output file '%s' for writing. "
206 "Check permissions on the file."), self.location))
207 self.addMessage(m)
208 return
209 self._plug_recording_started(self.file, self.location)
210 sink.emit('add', self.file.fileno())
211 self.uiState.set('filename', self.location)
212 self.uiState.set('recording', True)
213
214 if self.symlink_to_current_recording:
215 self.update_symlink(self.location,
216 self.symlink_to_current_recording)
217
219 if not dest.startswith('/'):
220 dest = os.path.join(self.directory, dest)
221 self.debug("updating symbolic link %s to point to %s", src, dest)
222 try:
223 try:
224 os.symlink(src, dest)
225 except OSError, e:
226 if e.errno == errno.EEXIST and os.path.islink(dest):
227 os.unlink(dest)
228 os.symlink(src, dest)
229 else:
230 raise
231 except Exception, e:
232 self.info("Failed to update link %s: %s", dest,
233 log.getExceptionMessage(e))
234 m = messages.Warning(T_(N_("Failed to update symbolic link "
235 "%s. Check your permissions."
236 % (dest,))),
237 debug=log.getExceptionMessage(e))
238 self.addMessage(m)
239
255
257 caps = pad.get_negotiated_caps()
258 if caps == None:
259 return
260
261 caps_str = gstreamer.caps_repr(caps)
262 self.debug('Got caps: %s' % caps_str)
263
264 new = True
265 if not self.caps == None:
266 self.warning('Already had caps: %s, replacing' % caps_str)
267 new = False
268
269 self.debug('Storing caps: %s' % caps_str)
270 self.caps = caps
271
272 if new and self._recordAtStart:
273 reactor.callLater(0, self.change_filename,
274 self._startFilenameTemplate)
275
276
277
279
280 if client_status == 4:
281 reactor.callFromThread(self._client_error_cb)
282
284 self.file.close()
285 self.file = None
286
287 self.setMood(moods.sad)
288 messageId = "error-writing-%s" % self.location
289 m = messages.Error(T_(N_(
290 "Error writing to file %s. Maybe disk is full." % (
291 self.location))),
292 mid=messageId, priority=40)
293 self.addMessage(m)
294
344
348
352
354 if HAS_ICAL:
355 cal = Calendar.from_string(icsStr)
356 if self.icalScheduler:
357 events = self.icalScheduler.parseCalendar(cal)
358 if events:
359 self.icalScheduler.addEvents(events)
360 else:
361 self.warning("No events found in the ical string")
362 else:
363 self.warning("Cannot parse ICAL; neccesary modules not installed")
364
366 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
367
368 if socket not in self.plugs:
369 return
370 for plug in self.plugs[socket]:
371 self.debug('invoking recording_started on '
372 'plug %r on socket %s', plug, socket)
373 plug.recording_started(file, location)
374
376 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
377
378 if socket not in self.plugs:
379 return
380 for plug in self.plugs[socket]:
381 self.debug('invoking recording_stopped on '
382 'plug %r on socket %s', plug, socket)
383 plug.recording_stopped(file, location)
384
386 if event.type == gst.EVENT_CUSTOM_DOWNSTREAM:
387 evt_struct = event.get_structure()
388 if evt_struct.get_name() == 'FluStreamMark':
389 if evt_struct['action'] == 'start':
390 self._on_marker_start(evt_struct['prog_id'])
391 elif evt_struct['action'] == 'stop':
392 self._on_marker_stop()
393 return True
394
397
399 tmpl = self._defaultFilenameTemplate
400 if self._marker_prefix:
401 try:
402 tmpl = '%s%s' % (self._marker_prefix % data,
403 self._defaultFilenameTemplate)
404 except TypeError, err:
405 m = messages.Warning(T_(N_('Failed expanding filename prefix: '
406 '%r <-- %r.'),
407 self._marker_prefix, data),
408 mid='expand-marker-prefix')
409 self.addMessage(m)
410 self.warning('Failed expanding filename prefix: '
411 '%r <-- %r; %r' %
412 (self._marker_prefix, data, err))
413 self.change_filename(tmpl)
414