Package flumotion :: Package component :: Package consumers :: Package httpstreamer :: Module httpstreamer
[hide private]

Source Code for Module flumotion.component.consumers.httpstreamer.httpstreamer

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  import gst 
 25  from twisted.cred import credentials 
 26  from twisted.internet import reactor, error, defer 
 27  from twisted.web import server 
 28  from zope.interface import implements 
 29   
 30  from flumotion.common import gstreamer, errors 
 31  from flumotion.common import messages, netutils, interfaces 
 32  from flumotion.common.format import formatStorage, formatTime 
 33  from flumotion.common.i18n import N_, gettexter 
 34  from flumotion.component import feedcomponent 
 35  from flumotion.component.base import http 
 36  from flumotion.component.component import moods 
 37  from flumotion.component.consumers.httpstreamer import resources 
 38  from flumotion.component.misc.porter import porterclient 
 39  from flumotion.twisted import fdserver 
 40   
 41  __all__ = ['HTTPMedium', 'MultifdSinkStreamer'] 
 42  __version__ = "$Rev: 6981 $" 
 43  T_ = gettexter() 
 44  STATS_POLL_INTERVAL = 10 
 45  UI_UPDATE_THROTTLE_PERIOD = 2.0 # Don't update UI more than once every two 
 46                                  # seconds 
 47   
 48   
 49  # FIXME: generalize this class and move it out here ? 
50 -class Stats:
51 - def __init__(self, sink):
52 self.sink = sink 53 54 self.no_clients = 0 55 self.clients_added_count = 0 56 self.clients_removed_count = 0 57 self.start_time = time.time() 58 # keep track of the highest number and the last epoch this was reached 59 self.peak_client_number = 0 60 self.peak_epoch = self.start_time 61 self.load_deltas = [0, 0] 62 self._load_deltas_period = 10 # seconds 63 self._load_deltas_ongoing = [time.time(), 0, 0] 64 self._currentBitrate = -1 # not known yet 65 self._lastBytesReceived = -1 # not known yet 66 67 # keep track of average clients by tracking last average and its time 68 self.average_client_number = 0 69 self.average_time = self.start_time 70 71 self.hostname = "localhost" 72 self.port = 0 73 self.mountPoint = "/"
74
75 - def _updateAverage(self):
76 # update running average of clients connected 77 now = time.time() 78 # calculate deltas 79 dt1 = self.average_time - self.start_time 80 dc1 = self.average_client_number 81 dt2 = now - self.average_time 82 dc2 = self.no_clients 83 self.average_time = now # we can update now that we used self.a 84 if dt1 == 0: 85 # first measurement 86 self.average_client_number = 0 87 else: 88 dt = dt1 + dt2 89 before = (dc1 * dt1) / dt 90 after = dc2 * dt2 / dt 91 self.average_client_number = before + after
92
93 - def clientAdded(self):
94 self._updateAverage() 95 96 self.no_clients += 1 97 self.clients_added_count +=1 98 99 # >= so we get the last epoch this peak was achieved 100 if self.no_clients >= self.peak_client_number: 101 self.peak_epoch = time.time() 102 self.peak_client_number = self.no_clients
103
104 - def clientRemoved(self):
105 self._updateAverage() 106 self.no_clients -= 1 107 self.clients_removed_count +=1
108
109 - def _updateStats(self):
110 """ 111 Periodically, update our statistics on load deltas, and update the 112 UIState with new values for total bytes, bitrate, etc. 113 """ 114 115 oldtime, oldadd, oldremove = self._load_deltas_ongoing 116 add, remove = self.clients_added_count, self.clients_removed_count 117 now = time.time() 118 diff = float(now - oldtime) 119 120 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff] 121 self._load_deltas_ongoing = [now, add, remove] 122 123 bytesReceived = self.getBytesReceived() 124 if self._lastBytesReceived >= 0: 125 self._currentBitrate = ((bytesReceived - self._lastBytesReceived) * 126 8 / STATS_POLL_INTERVAL) 127 self._lastBytesReceived = bytesReceived 128 129 self.update_ui_state() 130 131 self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL, 132 self._updateStats)
133
134 - def getCurrentBitrate(self):
135 if self._currentBitrate >= 0: 136 return self._currentBitrate 137 else: 138 return self.getBytesReceived() * 8 / self.getUptime()
139
140 - def getBytesSent(self):
141 return self.sink.get_property('bytes-served')
142
143 - def getBytesReceived(self):
144 return self.sink.get_property('bytes-to-serve')
145
146 - def getUptime(self):
147 return time.time() - self.start_time
148
149 - def getClients(self):
150 return self.no_clients
151
152 - def getPeakClients(self):
153 return self.peak_client_number
154
155 - def getPeakEpoch(self):
156 return self.peak_epoch
157
158 - def getAverageClients(self):
159 return self.average_client_number
160
161 - def getUrl(self):
162 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
163
164 - def getLoadDeltas(self):
165 return self.load_deltas
166
167 - def updateState(self, set):
168 c = self 169 170 bytes_sent = c.getBytesSent() 171 bytes_received = c.getBytesReceived() 172 uptime = c.getUptime() 173 174 set('stream-mime', c.get_mime()) 175 set('stream-url', c.getUrl()) 176 set('stream-uptime', formatTime(uptime)) 177 bitspeed = bytes_received * 8 / uptime 178 currentbitrate = self.getCurrentBitrate() 179 set('stream-bitrate', formatStorage(bitspeed) + 'bit/s') 180 set('stream-current-bitrate', 181 formatStorage(currentbitrate) + 'bit/s') 182 set('stream-totalbytes', formatStorage(bytes_received) + 'Byte') 183 set('stream-bitrate-raw', bitspeed) 184 set('stream-totalbytes-raw', bytes_received) 185 186 set('clients-current', str(c.getClients())) 187 set('clients-max', str(c.getMaxClients())) 188 set('clients-peak', str(c.getPeakClients())) 189 set('clients-peak-time', c.getPeakEpoch()) 190 set('clients-average', str(int(c.getAverageClients()))) 191 192 bitspeed = bytes_sent * 8 / uptime 193 set('consumption-bitrate', formatStorage(bitspeed) + 'bit/s') 194 set('consumption-bitrate-current', 195 formatStorage(currentbitrate * c.getClients()) + 'bit/s') 196 set('consumption-totalbytes', formatStorage(bytes_sent) + 'Byte') 197 set('consumption-bitrate-raw', bitspeed) 198 set('consumption-totalbytes-raw', bytes_sent)
199 200
201 -class HTTPMedium(feedcomponent.FeedComponentMedium):
202 - def __init__(self, comp):
203 """ 204 @type comp: L{Stats} 205 """ 206 feedcomponent.FeedComponentMedium.__init__(self, comp)
207
208 - def authenticate(self, bouncerName, keycard):
209 """ 210 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None. 211 """ 212 d = self.callRemote('authenticate', bouncerName, keycard) 213 return d
214
215 - def keepAlive(self, bouncerName, issuerName, ttl):
216 """ 217 @rtype: L{twisted.internet.defer.Deferred} 218 """ 219 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
220
221 - def removeKeycardId(self, bouncerName, keycardId):
222 """ 223 @rtype: L{twisted.internet.defer.Deferred} 224 """ 225 return self.callRemote('removeKeycardId', bouncerName, keycardId)
226 227 ### remote methods for manager to call on
228 - def remote_expireKeycard(self, keycardId):
229 self.comp.httpauth.expireKeycard(keycardId)
230
231 - def remote_notifyState(self):
232 self.comp.update_ui_state()
233
234 - def remote_rotateLog(self):
235 self.comp.resource.rotateLogs()
236
237 - def remote_getStreamData(self):
238 return self.comp.getStreamData()
239
240 - def remote_getLoadData(self):
241 return self.comp.getLoadData()
242
243 - def remote_updatePorterDetails(self, path, username, password):
244 return self.comp.updatePorterDetails(path, username, password)
245 246 247 ### the actual component is a streamer using multifdsink
248 -class MultifdSinkStreamer(feedcomponent.ParseLaunchComponent, Stats):
249 implements(interfaces.IStreamingComponent) 250 251 checkOffset = True 252 253 # this object is given to the HTTPMedium as comp 254 logCategory = 'cons-http' 255 256 pipe_template = 'multifdsink name=sink ' + \ 257 'sync=false ' + \ 258 'recover-policy=3' 259 260 componentMediumClass = HTTPMedium 261
262 - def init(self):
263 reactor.debug = True 264 self.debug("HTTP streamer initialising") 265 266 self.caps = None 267 self.resource = None 268 self.httpauth = None 269 self.mountPoint = None 270 self.burst_on_connect = False 271 272 self.description = None 273 274 self.type = None 275 276 # Used if we've slaved to a porter. 277 self._pbclient = None 278 self._porterUsername = None 279 self._porterPassword = None 280 self._porterPath = None 281 282 # Or if we're a master, we open our own port here. Also used for URLs 283 # in the porter case. 284 self.port = None 285 # We listen on this interface, if set. 286 self.iface = None 287 288 self._tport = None 289 290 self._updateCallLaterId = None 291 self._lastUpdate = 0 292 self._updateUI_DC = None 293 294 self._pending_removals = {} 295 296 for i in ('stream-mime', 'stream-uptime', 'stream-current-bitrate', 297 'stream-bitrate', 'stream-totalbytes', 'clients-current', 298 'clients-max', 'clients-peak', 'clients-peak-time', 299 'clients-average', 'consumption-bitrate', 300 'consumption-bitrate-current', 301 'consumption-totalbytes', 'stream-bitrate-raw', 302 'stream-totalbytes-raw', 'consumption-bitrate-raw', 303 'consumption-totalbytes-raw', 'stream-url'): 304 self.uiState.addKey(i, None)
305
306 - def getDescription(self):
307 return self.description
308
309 - def get_pipeline_string(self, properties):
310 return self.pipe_template
311
312 - def check_properties(self, props, addMessage):
313 314 # F0.6: remove backwards-compatible properties 315 self.fixRenamedProperties(props, [ 316 ('issuer', 'issuer-class'), 317 ('mount_point', 'mount-point'), 318 ('porter_socket_path', 'porter-socket-path'), 319 ('porter_username', 'porter-username'), 320 ('porter_password', 'porter-password'), 321 ('user_limit', 'client-limit'), 322 ('bandwidth_limit', 'bandwidth-limit'), 323 ('burst_on_connect', 'burst-on-connect'), 324 ('burst_size', 'burst-size'), 325 ]) 326 327 if props.get('type', 'master') == 'slave': 328 for k in 'socket-path', 'username', 'password': 329 if not 'porter-' + k in props: 330 raise errors.ConfigError("slave mode, missing required" 331 " property 'porter-%s'" % k) 332 333 if 'burst-size' in props and 'burst-time' in props: 334 raise errors.ConfigError('both burst-size and burst-time ' 335 'set, cannot satisfy') 336 337 # tcp is where multifdsink is 338 version = gstreamer.get_plugin_version('tcp') 339 if version < (0, 10, 9, 1): 340 m = messages.Error(T_(N_( 341 "Version %s of the '%s' GStreamer plug-in is too old.\n"), 342 ".".join(map(str, version)), 'multifdsink')) 343 m.add(T_(N_("Please upgrade '%s' to version %s."), 344 'gst-plugins-base', '0.10.10')) 345 addMessage(m)
346
347 - def time_bursting_supported(self, sink):
348 try: 349 sink.get_property('units-max') 350 return True 351 except TypeError: 352 return False
353
354 - def setup_burst_mode(self, sink):
355 if self.burst_on_connect: 356 if self.burst_time and self.time_bursting_supported(sink): 357 self.debug("Configuring burst mode for %f second burst", 358 self.burst_time) 359 # Set a burst for configurable minimum time, plus extra to 360 # start from a keyframe if needed. 361 sink.set_property('sync-method', 4) # burst-keyframe 362 sink.set_property('burst-unit', 2) # time 363 sink.set_property('burst-value', 364 long(self.burst_time * gst.SECOND)) 365 366 # We also want to ensure that we have sufficient data available 367 # to satisfy this burst; and an appropriate maximum, all 368 # specified in units of time. 369 sink.set_property('time-min', 370 long((self.burst_time + 5) * gst.SECOND)) 371 372 sink.set_property('unit-type', 2) # time 373 sink.set_property('units-soft-max', 374 long((self.burst_time + 8) * gst.SECOND)) 375 sink.set_property('units-max', 376 long((self.burst_time + 10) * gst.SECOND)) 377 elif self.burst_size: 378 self.debug("Configuring burst mode for %d kB burst", 379 self.burst_size) 380 # If we have a burst-size set, use modern 381 # needs-recent-multifdsink behaviour to have complex bursting. 382 # In this mode, we burst a configurable minimum, plus extra 383 # so we start from a keyframe (or less if we don't have a 384 # keyframe available) 385 sink.set_property('sync-method', 'burst-keyframe') 386 sink.set_property('burst-unit', 'bytes') 387 sink.set_property('burst-value', self.burst_size * 1024) 388 389 # To use burst-on-connect, we need to ensure that multifdsink 390 # has a minimum amount of data available - assume 512 kB beyond 391 # the burst amount so that we should have a keyframe available 392 sink.set_property('bytes-min', (self.burst_size + 512) * 1024) 393 394 # And then we need a maximum still further above that - the 395 # exact value doesn't matter too much, but we want it reasonably 396 # small to limit memory usage. multifdsink doesn't give us much 397 # control here, we can only specify the max values in buffers. 398 # We assume each buffer is close enough to 4kB - true for asf 399 # and ogg, at least 400 sink.set_property('buffers-soft-max', 401 (self.burst_size + 1024) / 4) 402 sink.set_property('buffers-max', 403 (self.burst_size + 2048) / 4) 404 405 else: 406 # Old behaviour; simple burst-from-latest-keyframe 407 self.debug("simple burst-on-connect, setting sync-method 2") 408 sink.set_property('sync-method', 2) 409 410 sink.set_property('buffers-soft-max', 250) 411 sink.set_property('buffers-max', 500) 412 else: 413 self.debug("no burst-on-connect, setting sync-method 0") 414 sink.set_property('sync-method', 0) 415 416 sink.set_property('buffers-soft-max', 250) 417 sink.set_property('buffers-max', 500)
418
419 - def configure_pipeline(self, pipeline, properties):
420 Stats.__init__(self, sink=self.get_element('sink')) 421 422 self._updateCallLaterId = reactor.callLater(10, self._updateStats) 423 424 mountPoint = properties.get('mount-point', '') 425 if not mountPoint.startswith('/'): 426 mountPoint = '/' + mountPoint 427 self.mountPoint = mountPoint 428 429 # Hostname is used for a variety of purposes. We do a best-effort guess 430 # where nothing else is possible, but it's much preferable to just 431 # configure this 432 self.hostname = properties.get('hostname', None) 433 self.iface = self.hostname # We listen on this if explicitly configured, 434 # but not if it's only guessed at by the 435 # below code. 436 if not self.hostname: 437 # Don't call this nasty, nasty, probably flaky function unless we 438 # need to. 439 self.hostname = netutils.guess_public_hostname() 440 441 self.description = properties.get('description', None) 442 if self.description is None: 443 self.description = "Flumotion Stream" 444 445 # FIXME: tie these together more nicely 446 self.httpauth = http.HTTPAuthentication(self) 447 self.resource = resources.HTTPStreamingResource(self, 448 self.httpauth) 449 450 # check how to set client sync mode 451 sink = self.get_element('sink') 452 self.burst_on_connect = properties.get('burst-on-connect', False) 453 self.burst_size = properties.get('burst-size', 0) 454 self.burst_time = properties.get('burst-time', 0.0) 455 456 self.setup_burst_mode(sink) 457 458 sink.connect('deep-notify::caps', self._notify_caps_cb) 459 460 # these are made threadsafe using idle_add in the handler 461 sink.connect('client-added', self._client_added_handler) 462 463 # We now require a sufficiently recent multifdsink anyway that we can 464 # use the new client-fd-removed signal 465 sink.connect('client-fd-removed', self._client_fd_removed_cb) 466 sink.connect('client-removed', self._client_removed_cb) 467 468 if properties.has_key('client-limit'): 469 limit = int(properties['client-limit']) 470 self.resource.setUserLimit(limit) 471 if limit != self.resource.maxclients: 472 self.addMessage( 473 messages.Info(T_(N_("Unable to set the maximum " 474 "client limit to %d clients."), 475 limit), 476 debug=("Your system has limited " 477 "the ability to open file " 478 "descriptors. Check your " 479 "limits.conf to see how to " 480 "raise this limit."))) 481 482 if properties.has_key('bandwidth-limit'): 483 limit = int(properties['bandwidth-limit']) 484 if limit < 1000: 485 # The wizard used to set this as being in Mbps, oops. 486 self.debug("Bandwidth limit set to unreasonably low %d bps, " 487 "assuming this is meant to be Mbps", limit) 488 limit *= 1000000 489 self.resource.setBandwidthLimit(limit) 490 491 if properties.has_key('redirect-on-overflow'): 492 self.resource.setRedirectionOnLimits( 493 properties['redirect-on-overflow']) 494 495 if properties.has_key('bouncer'): 496 self.httpauth.setBouncerName(properties['bouncer']) 497 498 if properties.has_key('issuer-class'): 499 self.httpauth.setIssuerClass(properties['issuer-class']) 500 501 if properties.has_key('duration'): 502 self.httpauth.setDefaultDuration(float(properties['duration'])) 503 504 if properties.has_key('domain'): 505 self.httpauth.setDomain(properties['domain']) 506 507 if self.config.has_key('avatarId'): 508 self.httpauth.setRequesterId(self.config['avatarId']) 509 510 if properties.has_key('ip-filter'): 511 logFilter = http.LogFilter() 512 for f in properties['ip-filter']: 513 logFilter.addIPFilter(f) 514 self.resource.setLogFilter(logFilter) 515 516 self.type = properties.get('type', 'master') 517 if self.type == 'slave': 518 # already checked for these in do_check 519 self._porterPath = properties['porter-socket-path'] 520 self._porterUsername = properties['porter-username'] 521 self._porterPassword = properties['porter-password'] 522 523 self.port = int(properties.get('port', 8800))
524
525 - def __repr__(self):
526 return '<MultifdSinkStreamer (%s)>' % self.name
527
528 - def getMaxClients(self):
529 return self.resource.maxclients
530
531 - def get_mime(self):
532 if self.caps: 533 return self.caps.get_structure(0).get_name()
534
535 - def get_content_type(self):
536 mime = self.get_mime() 537 if mime == 'multipart/x-mixed-replace': 538 mime += ";boundary=ThisRandomString" 539 return mime
540
541 - def getUrl(self):
542 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
543
544 - def getStreamData(self):
545 socket = 'flumotion.component.plugs.streamdata.StreamDataProvider' 546 if self.plugs[socket]: 547 plug = self.plugs[socket][-1] 548 return plug.getStreamData() 549 else: 550 return { 551 'protocol': 'HTTP', 552 'description': self.description, 553 'url' : self.getUrl() 554 }
555
556 - def getLoadData(self):
557 """Return a tuple (deltaadded, deltaremoved, bytes_transferred, 558 current_clients, current_load) of our current bandwidth and user values. 559 The deltas are estimates of how much bitrate is added, removed 560 due to client connections, disconnections, per second. 561 """ 562 # We calculate the estimated clients added/removed per second, then 563 # multiply by the stream bitrate 564 deltaadded, deltaremoved = self.getLoadDeltas() 565 566 bytes_received = self.getBytesReceived() 567 uptime = self.getUptime() 568 bitrate = bytes_received * 8 / uptime 569 570 bytes_sent = self.getBytesSent() 571 clients_connected = self.getClients() 572 current_load = bitrate * clients_connected 573 574 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent, 575 clients_connected, current_load)
576
577 - def add_client(self, fd):
578 sink = self.get_element('sink') 579 sink.emit('add', fd)
580
581 - def remove_client(self, fd):
582 sink = self.get_element('sink') 583 sink.emit('remove', fd)
584
585 - def remove_all_clients(self):
586 """Remove all the clients. 587 588 Returns a deferred fired once all clients have been removed. 589 """ 590 if self.resource: 591 # can be None if we never went happy 592 self.debug("Asking for all clients to be removed") 593 return self.resource.removeAllClients()
594
595 - def update_ui_state(self):
596 """Update the uiState object. 597 Such updates (through this function) are throttled to a maximum rate, 598 to avoid saturating admin clients with traffic when many clients are 599 connecting/disconnecting. 600 """ 601 def setIfChanged(k, v): 602 if self.uiState.get(k) != v: 603 self.uiState.set(k, v)
604 605 def update_ui_state_later(): 606 self._updateUI_DC = None 607 self.update_ui_state()
608 609 now = time.time() 610 611 # If we haven't updated too recently, do it immediately. 612 if now - self._lastUpdate >= UI_UPDATE_THROTTLE_PERIOD: 613 if self._updateUI_DC: 614 self._updateUI_DC.cancel() 615 self._updateUI_DC = None 616 617 self._lastUpdate = now 618 # fixme: have updateState just update what changed itself 619 # without the hack above 620 self.updateState(setIfChanged) 621 elif not self._updateUI_DC: 622 # Otherwise, schedule doing this in a few seconds (unless an update 623 # was already scheduled) 624 self._updateUI_DC = reactor.callLater(UI_UPDATE_THROTTLE_PERIOD, 625 update_ui_state_later) 626
627 - def _client_added_handler(self, sink, fd):
628 self.log('[fd %5d] client_added_handler', fd) 629 Stats.clientAdded(self) 630 self.update_ui_state()
631
632 - def _client_removed_handler(self, sink, fd, reason, stats):
633 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason) 634 if reason.value_name == 'GST_CLIENT_STATUS_ERROR': 635 self.warning('[fd %5d] Client removed because of write error' % fd) 636 637 self.resource.clientRemoved(sink, fd, reason, stats) 638 Stats.clientRemoved(self) 639 self.update_ui_state()
640 641 ### START OF THREAD-AWARE CODE (called from non-reactor threads) 642
643 - def _notify_caps_cb(self, element, pad, param):
644 caps = pad.get_negotiated_caps() 645 if caps == None: 646 return 647 648 caps_str = gstreamer.caps_repr(caps) 649 self.debug('Got caps: %s' % caps_str) 650 651 if not self.caps == None: 652 self.warning('Already had caps: %s, replacing' % caps_str) 653 654 self.debug('Storing caps: %s' % caps_str) 655 self.caps = caps 656 657 reactor.callFromThread(self.update_ui_state)
658 659 # We now use both client-removed and client-fd-removed. We call get-stats 660 # from the first callback ('client-removed'), but don't actually start 661 # removing the client until we get 'client-fd-removed'. This ensures that 662 # there's no window in which multifdsink still knows about the fd, but we've # actually closed it, so we no longer get spurious duplicates. 663 # this can be called from both application and streaming thread !
664 - def _client_removed_cb(self, sink, fd, reason):
665 stats = sink.emit('get-stats', fd) 666 self._pending_removals[fd] = (stats, reason)
667 668 # this can be called from both application and streaming thread !
669 - def _client_fd_removed_cb(self, sink, fd):
670 (stats, reason) = self._pending_removals.pop(fd) 671 672 reactor.callFromThread(self._client_removed_handler, sink, fd, 673 reason, stats)
674 675 ### END OF THREAD-AWARE CODE 676
677 - def do_stop(self):
678 if self._updateCallLaterId: 679 self._updateCallLaterId.cancel() 680 self._updateCallLaterId = None 681 682 if self.httpauth: 683 self.httpauth.stopKeepAlive() 684 685 if self._tport: 686 self._tport.stopListening() 687 688 l = [] 689 # After we stop listening (so new connections aren't possible), 690 # disconnect (and thus log) all the old ones. 691 clients = self.remove_all_clients() 692 if clients: 693 l.append(clients) 694 695 if self.type == 'slave' and self._pbclient: 696 l.append(self._pbclient.deregisterPath(self.mountPoint)) 697 return defer.DeferredList(l)
698
699 - def updatePorterDetails(self, path, username, password):
700 """Provide a new set of porter login information, for when we're 701 in slave mode and the porter changes. 702 If we're currently connected, this won't disconnect - it'll just change 703 the information so that next time we try and connect we'll use the 704 new ones 705 """ 706 if self.type == 'slave': 707 self._porterUsername = username 708 self._porterPassword = password 709 710 creds = credentials.UsernamePassword(self._porterUsername, 711 self._porterPassword) 712 713 self._pbclient.startLogin(creds, self._pbclient.medium) 714 715 # If we've changed paths, we must do some extra work. 716 if path != self._porterPath: 717 self.debug("Changing porter login to use \"%s\"", path) 718 self._porterPath = path 719 self._pbclient.stopTrying() # Stop trying to connect with the 720 # old connector. 721 self._pbclient.resetDelay() 722 reactor.connectWith( 723 fdserver.FDConnector, self._porterPath, 724 self._pbclient, 10, checkPID=False) 725 else: 726 raise errors.WrongStateError( 727 "Can't specify porter details in master mode")
728
729 - def do_pipeline_playing(self):
730 # Override this to not set the component happy; instead do this once 731 # both the pipeline has started AND we've logged in to the porter. 732 if hasattr(self, '_porterDeferred'): 733 d = self._porterDeferred 734 else: 735 d = defer.succeed(None) 736 self.httpauth.scheduleKeepAlive() 737 d.addCallback(lambda res: feedcomponent.ParseLaunchComponent.do_pipeline_playing(self)) 738 return d
739
740 - def do_setup(self):
741 root = resources.HTTPRoot() 742 # TwistedWeb wants the child path to not include the leading / 743 mount = self.mountPoint[1:] 744 root.putChild(mount, self.resource) 745 if self.type == 'slave': 746 # Streamer is slaved to a porter. 747 748 # We have two things we want to do in parallel: 749 # - ParseLaunchComponent.do_start() 750 # - log in to the porter, then register our mountpoint with 751 # the porter. 752 # So, we return a DeferredList with a deferred for each of 753 # these tasks. The second one's a bit tricky: we pass a dummy 754 # deferred to our PorterClientFactory that gets fired once 755 # we've done all of the tasks the first time (it's an 756 # automatically-reconnecting client factory, and we only fire 757 # this deferred the first time) 758 759 self._porterDeferred = d = defer.Deferred() 760 mountpoints = [self.mountPoint] 761 self._pbclient = porterclient.HTTPPorterClientFactory( 762 server.Site(resource=root), mountpoints, d) 763 764 creds = credentials.UsernamePassword(self._porterUsername, 765 self._porterPassword) 766 self._pbclient.startLogin(creds, self._pbclient.medium) 767 768 self.debug("Starting porter login at \"%s\"", self._porterPath) 769 # This will eventually cause d to fire 770 reactor.connectWith( 771 fdserver.FDConnector, self._porterPath, 772 self._pbclient, 10, checkPID=False) 773 else: 774 # Streamer is standalone. 775 try: 776 self.debug('Listening on %d' % self.port) 777 iface = self.iface or "" 778 self._tport = reactor.listenTCP(self.port, server.Site(resource=root), 779 interface=iface) 780 except error.CannotListenError: 781 t = 'Port %d is not available.' % self.port 782 self.warning(t) 783 m = messages.Error(T_(N_( 784 "Network error: TCP port %d is not available."), self.port)) 785 self.addMessage(m) 786 self.setMood(moods.sad) 787 return defer.fail(errors.ComponentStartHandledError(t))
788