Package flumotion :: Package common :: Module worker
[hide private]

Source Code for Module flumotion.common.worker

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_common_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Objects related to the state of workers. 
 24  """ 
 25   
 26  import os 
 27  import signal 
 28   
 29  from twisted.spread import pb 
 30  from twisted.internet import protocol 
 31   
 32  from flumotion.twisted import flavors 
 33  from flumotion.common import log, errors, messages 
 34   
 35  from flumotion.common.messages import N_ 
 36  T_ = messages.gettexter('flumotion') 
 37   
38 -class ProcessProtocol(protocol.ProcessProtocol):
39 - def __init__(self, loggable, avatarId, processType, where):
40 self.loggable = loggable 41 self.avatarId = avatarId 42 self.processType = processType # e.g., 'component' 43 self.where = where # e.g., 'worker 1' 44 45 self.setPid(None)
46
47 - def setPid(self, pid):
48 self.pid = pid
49
50 - def sendMessage(self, message):
51 raise NotImplementedError
52
53 - def processEnded(self, status):
54 # vmethod implementation 55 # status is an instance of failure.Failure 56 # status.value is a twisted.internet.error.ProcessTerminated 57 # status.value.status is the os.WAIT-like status value 58 message = None 59 obj = self.loggable 60 pid = None 61 # if we have a pid, then set pid to string value of pid 62 # otherwise set to "unknown" 63 if self.pid: 64 pid = str(self.pid) 65 else: 66 pid = "unknown" 67 if status.value.exitCode is not None: 68 obj.info("Reaped child with pid %s, exit value %d.", 69 pid, status.value.exitCode) 70 signum = status.value.signal 71 72 # SIGKILL is an explicit kill, and never generates a core dump. 73 # For any other signal we want to see if there is a core dump, 74 # and warn if not. 75 if signum is not None: 76 if signum == signal.SIGKILL: 77 obj.warning("Child with pid %s killed.", pid) 78 message = messages.Error(T_(N_("The %s was killed.\n"), 79 self.processType)) 80 else: 81 message = messages.Error(T_(N_("The %s crashed.\n"), 82 self.processType), 83 debug='Terminated with signal number %d' % signum) 84 85 # use some custom logging depending on signal 86 if signum == signal.SIGSEGV: 87 obj.warning("Child with pid %s segfaulted.", pid) 88 elif signum == signal.SIGTRAP: 89 # SIGTRAP occurs when registry is corrupt 90 obj.warning("Child with pid %s received a SIGTRAP.", 91 pid) 92 else: 93 # if we find any of these, possibly special-case them too 94 obj.info("Reaped child with pid %s signaled by " 95 "signal %d.", pid, signum) 96 97 if not os.WCOREDUMP(status.value.status): 98 obj.warning("No core dump generated. " 99 "Were core dumps enabled at the start ?") 100 message.add(T_(N_( 101 "However, no core dump was generated. " 102 "You may need to configure the environment " 103 "if you want to further debug this problem."))) 104 else: 105 obj.info("Core dumped.") 106 corepath = os.path.join(os.getcwd(), 'core.%s' % pid) 107 if os.path.exists(corepath): 108 obj.info("Core file is probably '%s'." % corepath) 109 message.add(T_(N_( 110 "The core dump is '%s' on the host running '%s'."), 111 corepath, self.where)) 112 # FIXME: add an action that runs gdb and produces a 113 # backtrace; or produce it here and attach to the 114 # message as debug info. 115 116 if message: 117 obj.debug('sending message to manager/admin') 118 self.sendMessage(message) 119 120 self.setPid(None)
121
122 -class PortSet(log.Loggable):
123 """ 124 A list of ports that keeps track of which are available for use on a 125 given machine. 126 """ 127 # not very efficient mkay
128 - def __init__(self, logName, ports):
129 self.logName = logName 130 self.ports = ports 131 self.used = [0] * len(ports)
132
133 - def reservePorts(self, numPorts):
134 ret = [] 135 while numPorts > 0: 136 if not 0 in self.used: 137 raise errors.ComponentStartError( 138 'could not allocate port on worker %s' % self.logName) 139 i = self.used.index(0) 140 ret.append(self.ports[i]) 141 self.used[i] = 1 142 numPorts -= 1 143 return ret
144
145 - def setPortsUsed(self, ports):
146 for port in ports: 147 try: 148 i = self.ports.index(port) 149 except IndexError: 150 self.warning('portset does not include port %d', port) 151 else: 152 if self.used[i]: 153 self.warning('port %d already in use!', port) 154 else: 155 self.used[i] = 1
156
157 - def releasePorts(self, ports):
158 """ 159 @param ports: list of ports to release 160 @type ports: list of int 161 """ 162 for p in ports: 163 try: 164 i = self.ports.index(p) 165 if self.used[i]: 166 self.used[i] = 0 167 else: 168 self.warning('releasing unallocated port: %d' % p) 169 except ValueError: 170 self.warning('releasing unknown port: %d' % p)
171
172 - def numFree(self):
173 return len(self.ports) - self.numUsed()
174
175 - def numUsed(self):
176 return len(filter(None, self.used))
177 178 # worker heaven state proxy objects
179 -class ManagerWorkerHeavenState(flavors.StateCacheable):
180 """ 181 I represent the state of the worker heaven on the manager. 182 183 I have the following keys: 184 185 - names (list): list of worker names that we have state for 186 - workers (list): list of L{ManagerWorkerState} 187 """
188 - def __init__(self):
189 flavors.StateCacheable.__init__(self) 190 self.addListKey('names', []) 191 self.addListKey('workers', []) # should be a dict
192
193 - def __repr__(self):
194 return "%r" % self._dict
195
196 -class AdminWorkerHeavenState(flavors.StateRemoteCache):
197 """ 198 I represent the state of the worker heaven in the admin. 199 See L{ManagerWorkerHeavenState} 200 """ 201 pass
202 203 pb.setUnjellyableForClass(ManagerWorkerHeavenState, AdminWorkerHeavenState) 204
205 -class ManagerWorkerState(flavors.StateCacheable):
206 """ 207 I represent the state of a worker in the manager. 208 209 - name: name of the worker 210 - host: the IP address of the worker as seen by the manager 211 """
212 - def __init__(self, **kwargs):
213 flavors.StateCacheable.__init__(self) 214 self.addKey('name') 215 self.addKey('host') 216 for k, v in kwargs.items(): 217 self.set(k, v)
218
219 - def __repr__(self):
220 return ("<ManagerWorkerState for %s on %s>" 221 % (self.get('name'), self.get('host')))
222
223 -class AdminWorkerState(flavors.StateRemoteCache):
224 """ 225 I represent the state of a worker in the admin. 226 227 See L{ManagerWorkerState} 228 """ 229 pass
230 231 pb.setUnjellyableForClass(ManagerWorkerState, AdminWorkerState) 232