1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import socket
24 import time
25 import errno
26 import string
27 import resource
28 import fcntl
29
30 import gst
31
32 try:
33 from twisted.web import http
34 except ImportError:
35 from twisted.protocols import http
36
37 from twisted.web import server, resource as web_resource
38 from twisted.internet import reactor, defer
39 from twisted.python import reflect
40
41 from flumotion.configure import configure
42 from flumotion.common import errors
43
44 from flumotion.common import common, log, keycards
45
46 from flumotion.component.base import http as httpbase
47
48 __all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer']
49 __version__ = "$Rev: 6628 $"
50
51 HTTP_NAME = 'FlumotionHTTPServer'
52 HTTP_VERSION = configure.version
53
54 ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN">
55 <html>
56 <head>
57 <title>%(code)d %(error)s</title>
58 </head>
59 <body>
60 <h2>%(code)d %(error)s</h2>
61 </body>
62 </html>
63 """
64
65 HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION)
66
67
69
70 __reserve_fds__ = 50
71
72 logCategory = 'httpstreamer'
73
74
75
76
77 isLeaf = True
78
80 """
81 @param streamer: L{MultifdSinkStreamer}
82 """
83 self.streamer = streamer
84 self.httpauth = httpauth
85
86 self._requests = {}
87
88 self.maxclients = self.getMaxAllowedClients(-1)
89 self.maxbandwidth = -1
90
91
92 self._redirectOnFull = None
93
94 self._removing = {}
95
96 self.loggers = \
97 streamer.plugs['flumotion.component.plugs.loggers.Logger']
98
99 self.logfilter = None
100
101 web_resource.Resource.__init__(self)
102
104
105
106 if fd in self._requests:
107 request = self._requests[fd]
108 self._removeClient(request, fd, stats)
109 else:
110 self.warning('[fd %5d] not found in _requests' % fd)
111
113 """
114 Start to remove all the clients connected (this will complete
115 asynchronously from another thread)
116
117 Returns a deferred that will fire once they're all removed.
118 """
119 l = []
120 for fd in self._requests:
121 self._removing[fd] = defer.Deferred()
122 l.append(self._removing[fd])
123 self.streamer.remove_client(fd)
124
125 return defer.DeferredList(l)
126
128 self.putChild(path, self)
129
131 self.logfilter = logfilter
132
134 """
135 Close the logfile, then reopen using the previous logfilename
136 """
137 for logger in self.loggers:
138 self.debug('rotating logger %r' % logger)
139 logger.rotate()
140
141 - def logWrite(self, fd, ip, request, stats):
142
143 headers = request.getAllHeaders()
144
145 if stats:
146 bytes_sent = stats[0]
147 time_connected = int(stats[3] / gst.SECOND)
148 else:
149 bytes_sent = -1
150 time_connected = -1
151
152 args = {'ip': ip,
153 'time': time.gmtime(),
154 'method': request.method,
155 'uri': request.uri,
156 'username': '-',
157 'get-parameters': request.args,
158 'clientproto': request.clientproto,
159 'response': request.code,
160 'bytes-sent': bytes_sent,
161 'referer': headers.get('referer', None),
162 'user-agent': headers.get('user-agent', None),
163 'time-connected': time_connected}
164
165 l = []
166 for logger in self.loggers:
167 l.append(defer.maybeDeferred(
168 logger.event, 'http_session_completed', args))
169
170 return defer.DeferredList(l)
171
173 self.info('setting maxclients to %d' % limit)
174 self.maxclients = self.getMaxAllowedClients(limit)
175
176 self.info('set maxclients to %d' % self.maxclients)
177
179 self.maxbandwidth = limit
180 self.info("set maxbandwidth to %d", self.maxbandwidth)
181
183 self._redirectOnFull = url
184
185
187 """
188 Write out the HTTP headers for the incoming HTTP request.
189
190 @rtype: boolean
191 @returns: whether or not the file descriptor can be used further.
192 """
193 fd = request.transport.fileno()
194 fdi = request.fdIncoming
195
196
197 if fd == -1:
198 self.info('[fd %5d] Client gone before writing header' % fdi)
199
200 return False
201 if fd != request.fdIncoming:
202 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd))
203
204 return False
205
206 headers = []
207
208 def setHeader(field, name):
209 headers.append('%s: %s\r\n' % (field, name))
210
211
212 content = self.streamer.get_content_type()
213 setHeader('Server', HTTP_SERVER)
214 setHeader('Date', http.datetimeToString())
215 setHeader('Cache-Control', 'no-cache')
216 setHeader('Cache-Control', 'private')
217 setHeader('Content-type', content)
218
219
220
221
222
223
224
225
226
227
228
229
230
231 try:
232
233
234
235
236 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers))
237
238 request.startedWriting = True
239 return True
240 except OSError, (no, s):
241 if no == errno.EBADF:
242 self.info('[fd %5d] client gone before writing header' % fd)
243 elif no == errno.ECONNRESET:
244 self.info('[fd %5d] client reset connection writing header' % fd)
245 else:
246 self.info('[fd %5d] unhandled write error when writing header: %s' % (fd, s))
247
248 del request
249 return False
250
252 if self.streamer.caps == None:
253 self.debug('We have no caps yet')
254 return False
255
256 return True
257
259 """
260 maximum number of allowed clients based on soft limit for number of
261 open file descriptors and fd reservation. Increases soft limit to
262 hard limit if possible.
263 """
264 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE)
265 import sys
266 version = sys.version_info
267
268 if maxclients != -1:
269 neededfds = maxclients + self.__reserve_fds__
270
271
272 if version[:3] == (2, 4, 3) and \
273 not hasattr(socket, "has_2_4_3_patch"):
274 self.warning(
275 'Setting hardmax to 1024 due to python 2.4.3 bug')
276 hardmax = 1024
277
278 if neededfds > softmax:
279 lim = min(neededfds, hardmax)
280 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax))
281 return lim - self.__reserve_fds__
282 else:
283 return maxclients
284 else:
285 return softmax - self.__reserve_fds__
286
288 if self.maxclients >= 0 and len(self._requests) >= self.maxclients:
289 return True
290 elif self.maxbandwidth >= 0:
291
292 if ((len(self._requests) + 1) *
293 self.streamer.getCurrentBitrate() >= self.maxbandwidth):
294 return True
295 return False
296
298 """
299 Add a request, so it can be used for statistics.
300
301 @param request: the request
302 @type request: twisted.protocol.http.Request
303 """
304
305 fd = request.transport.fileno()
306 self._requests[fd] = request
307
309 """
310 Returns whether we want to log a request from this IP; allows us to
311 filter requests from automated monitoring systems.
312 """
313 if self.logfilter:
314 return not self.logfilter.isInRange(ip)
315 else:
316 return True
317
319 """
320 Removes a request and add logging.
321 Note that it does not disconnect the client; it is called in reaction
322 to a client disconnecting.
323 It also removes the keycard if one was created.
324
325 @param request: the request
326 @type request: L{twisted.protocols.http.Request}
327 @param fd: the file descriptor for the client being removed
328 @type fd: L{int}
329 @param stats: the statistics for the removed client
330 @type stats: GValueArray
331 """
332
333 ip = request.getClientIP()
334 if self._logRequestFromIP(ip):
335 d = self.logWrite(fd, ip, request, stats)
336 else:
337 d = defer.succeed(True)
338 self.info('[fd %5d] Client from %s disconnected' % (fd, ip))
339
340
341
342
343 self.httpauth.cleanupAuth(fd)
344
345 self.debug('[fd %5d] closing transport %r' % (fd, request.transport))
346
347
348
349 del self._requests[fd]
350 request.transport.loseConnection()
351
352 self.debug('[fd %5d] closed transport %r' % (fd, request.transport))
353
354 def _done(_):
355 if fd in self._removing:
356 self.debug("client is removed; firing deferred")
357 removeD = self._removing.pop(fd)
358 removeD.callback(None)
359 d.addCallback(_done)
360 return d
361
373
374
375
376
378 fd = request.transport.fileno()
379
380
381 request.fdIncoming = fd
382
383 self.info('[fd %5d] Incoming client connection from %s' % (
384 fd, request.getClientIP()))
385 self.debug('[fd %5d] _render(): request %s' % (
386 fd, request))
387
388 if not self.isReady():
389 return self._handleNotReady(request)
390 elif self.reachedServerLimits():
391 return self._handleServerFull(request)
392
393 self.debug('_render(): asked for (possible) authentication')
394 d = self.httpauth.startAuthentication(request)
395 d.addCallback(self.handleAuthenticatedRequest, request)
396
397
398 d.addErrback(lambda x: None)
399
400
401 return server.NOT_DONE_YET
402
404 self.debug('Not sending data, it\'s not ready')
405 return server.NOT_DONE_YET
406
408 if self._redirectOnFull:
409 self.debug("Redirecting client, client limit %d reached",
410 self.maxclients)
411 error_code = http.FOUND
412 request.setHeader('location', self._redirectOnFull)
413 else:
414 self.debug('Refusing clients, client limit %d reached' %
415 self.maxclients)
416 error_code = http.SERVICE_UNAVAILABLE
417
418 request.setHeader('content-type', 'text/html')
419
420 request.setHeader('server', HTTP_VERSION)
421 request.setResponseCode(error_code)
422
423 return ERROR_TEMPLATE % {'code': error_code,
424 'error': http.RESPONSES[error_code]}
425
427
428 fdi = request.fdIncoming
429 if not self._writeHeaders(request):
430 self.debug("[fd %5d] not adding as a client" % fdi)
431 return
432 self._addClient(request)
433
434
435
436
437
438
439
440
441 fd = fdi
442 self.debug("taking away [fd %5d] from Twisted" % fd)
443 reactor.removeReader(request.transport)
444
445
446
447
448 try:
449 fcntl.fcntl(fd, fcntl.F_GETFL)
450 except IOError, e:
451 if e.errno == errno.EBADF:
452 self.warning("[fd %5d] is not actually open, ignoring" % fd)
453 else:
454 self.warning("[fd %5d] error during check: %s (%d)" % (
455 fd, e.strerror, e.errno))
456 return
457
458
459 self.streamer.add_client(fd)
460 ip = request.getClientIP()
461
462 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
463
464 render_GET = _render
465 render_HEAD = _render
466
467 -class HTTPRoot(web_resource.Resource, log.Loggable):
468 logCategory = "httproot"
469
471
472
473
474 fullPath = path
475 if request.postpath:
476 fullPath += '/' + string.join(request.postpath, '/')
477 self.debug("Incoming request %r for path %s" % (request, fullPath))
478 r = web_resource.Resource.getChildWithDefault(self, fullPath, request)
479 self.debug("Returning resource %r" % r)
480 return r
481