Package flumotion :: Package common :: Module worker
[hide private]

Source Code for Module flumotion.common.worker

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_common_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """objects related to the state of workers. 
 23  """ 
 24   
 25  import os 
 26  import signal 
 27   
 28  from twisted.spread import pb 
 29  from twisted.internet import protocol 
 30   
 31  from flumotion.common import log, errors, messages 
 32  from flumotion.common.i18n import N_, gettexter 
 33  from flumotion.twisted import flavors 
 34   
 35  __version__ = "$Rev: 6964 $" 
 36  T_ = gettexter() 
 37   
 38   
39 -class ProcessProtocol(protocol.ProcessProtocol):
40 - def __init__(self, loggable, avatarId, processType, where):
41 self.loggable = loggable 42 self.avatarId = avatarId 43 self.processType = processType # e.g., 'component' 44 self.where = where # e.g., 'worker 1' 45 46 self.setPid(None)
47
48 - def setPid(self, pid):
49 self.pid = pid
50
51 - def sendMessage(self, message):
52 raise NotImplementedError
53
54 - def processEnded(self, status):
55 # vmethod implementation 56 # status is an instance of failure.Failure 57 # status.value is a twisted.internet.error.ProcessTerminated 58 # status.value.status is the os.WAIT-like status value 59 message = None 60 obj = self.loggable 61 pid = None 62 # if we have a pid, then set pid to string value of pid 63 # otherwise set to "unknown" 64 if self.pid: 65 pid = str(self.pid) 66 else: 67 pid = "unknown" 68 if status.value.exitCode is not None: 69 obj.info("Reaped child with pid %s, exit value %d.", 70 pid, status.value.exitCode) 71 signum = status.value.signal 72 73 # SIGKILL is an explicit kill, and never generates a core dump. 74 # For any other signal we want to see if there is a core dump, 75 # and warn if not. 76 if signum is not None: 77 if signum == signal.SIGKILL: 78 obj.warning("Child with pid %s killed.", pid) 79 message = messages.Error(T_(N_("The %s was killed.\n"), 80 self.processType)) 81 else: 82 message = messages.Error(T_(N_("The %s crashed.\n"), 83 self.processType), 84 debug='Terminated with signal number %d' % signum) 85 86 # use some custom logging depending on signal 87 if signum == signal.SIGSEGV: 88 obj.warning("Child with pid %s segfaulted.", pid) 89 elif signum == signal.SIGTRAP: 90 # SIGTRAP occurs when registry is corrupt 91 obj.warning("Child with pid %s received a SIGTRAP.", 92 pid) 93 else: 94 # if we find any of these, possibly special-case them too 95 obj.info("Reaped child with pid %s signaled by " 96 "signal %d.", pid, signum) 97 98 if not os.WCOREDUMP(status.value.status): 99 obj.warning("No core dump generated. " 100 "Were core dumps enabled at the start ?") 101 message.add(T_(N_( 102 "However, no core dump was generated. " 103 "You may need to configure the environment " 104 "if you want to further debug this problem."))) 105 #message.description = T_(N_( 106 # "Learn how to enable core dumps.")) 107 else: 108 obj.info("Core dumped.") 109 corepath = os.path.join(os.getcwd(), 'core.%s' % pid) 110 if os.path.exists(corepath): 111 obj.info("Core file is probably '%s'." % corepath) 112 message.add(T_(N_( 113 "The core dump is '%s' on the host running '%s'."), 114 corepath, self.where)) 115 # FIXME: add an action that runs gdb and produces a 116 # backtrace; or produce it here and attach to the 117 # message as debug info. 118 message.description = T_(N_( 119 "Learn how to analyze core dumps.")) 120 message.section = 'chapter-debug' 121 message.anchor = 'section-os-analyze-core-dumps' 122 123 if message: 124 obj.debug('sending message to manager/admin') 125 self.sendMessage(message) 126 127 self.setPid(None)
128
129 -class PortSet(log.Loggable):
130 """ 131 A list of ports that keeps track of which are available for use on a 132 given machine. 133 """ 134 # not very efficient mkay
135 - def __init__(self, logName, ports, randomPorts=False):
136 self.logName = logName 137 self.ports = ports 138 self.used = [0] * len(ports) 139 self.random = randomPorts
140
141 - def reservePorts(self, numPorts):
142 ret = [] 143 while numPorts > 0: 144 if self.random: 145 ret.append(0) 146 numPorts -= 1 147 continue 148 if not 0 in self.used: 149 raise errors.ComponentStartError( 150 'could not allocate port on worker %s' % self.logName) 151 i = self.used.index(0) 152 ret.append(self.ports[i]) 153 self.used[i] = 1 154 numPorts -= 1 155 return ret
156
157 - def setPortsUsed(self, ports):
158 for port in ports: 159 try: 160 i = self.ports.index(port) 161 except ValueError: 162 self.warning('portset does not include port %d', port) 163 else: 164 if self.used[i]: 165 self.warning('port %d already in use!', port) 166 else: 167 self.used[i] = 1
168
169 - def releasePorts(self, ports):
170 """ 171 @param ports: list of ports to release 172 @type ports: list of int 173 """ 174 for p in ports: 175 try: 176 i = self.ports.index(p) 177 if self.used[i]: 178 self.used[i] = 0 179 else: 180 self.warning('releasing unallocated port: %d' % p) 181 except ValueError: 182 self.warning('releasing unknown port: %d' % p)
183
184 - def numFree(self):
185 return len(self.ports) - self.numUsed()
186
187 - def numUsed(self):
188 return len(filter(None, self.used))
189 190 # worker heaven state proxy objects
191 -class ManagerWorkerHeavenState(flavors.StateCacheable):
192 """ 193 I represent the state of the worker heaven on the manager. 194 195 I have the following keys: 196 197 - names (list): list of worker names that we have state for 198 - workers (list): list of L{ManagerWorkerState} 199 """
200 - def __init__(self):
201 flavors.StateCacheable.__init__(self) 202 self.addListKey('names', []) 203 self.addListKey('workers', []) # should be a dict
204
205 - def __repr__(self):
206 return "%r" % self._dict
207
208 -class AdminWorkerHeavenState(flavors.StateRemoteCache):
209 """ 210 I represent the state of the worker heaven in the admin. 211 See L{ManagerWorkerHeavenState} 212 """ 213 pass
214 215 pb.setUnjellyableForClass(ManagerWorkerHeavenState, AdminWorkerHeavenState) 216
217 -class ManagerWorkerState(flavors.StateCacheable):
218 """ 219 I represent the state of a worker in the manager. 220 221 - name: name of the worker 222 - host: the IP address of the worker as seen by the manager 223 """
224 - def __init__(self, **kwargs):
225 flavors.StateCacheable.__init__(self) 226 self.addKey('name') 227 self.addKey('host') 228 for k, v in kwargs.items(): 229 self.set(k, v)
230
231 - def __repr__(self):
232 return ("<ManagerWorkerState for %s on %s>" 233 % (self.get('name'), self.get('host')))
234
235 -class AdminWorkerState(flavors.StateRemoteCache):
236 """ 237 I represent the state of a worker in the admin. 238 239 See L{ManagerWorkerState} 240 """ 241 pass
242 243 pb.setUnjellyableForClass(ManagerWorkerState, AdminWorkerState) 244