root/flup/client/fcgi_app.py

Revision 44:059ee7930bbe, 14.4 kB (checked in by Allan Saddi <allan@saddi.com>, 2 years ago)

Add FCGIApp and SCGIApp from sandbox.

Line 
1 # Copyright (c) 2006 Allan Saddi <allan@saddi.com>
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions
6 # are met:
7 # 1. Redistributions of source code must retain the above copyright
8 #    notice, this list of conditions and the following disclaimer.
9 # 2. Redistributions in binary form must reproduce the above copyright
10 #    notice, this list of conditions and the following disclaimer in the
11 #    documentation and/or other materials provided with the distribution.
12 #
13 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23 # SUCH DAMAGE.
24 #
25 # $Id$
26
27 __author__ = 'Allan Saddi <allan@saddi.com>'
28 __version__ = '$Revision$'
29
30 import select
31 import struct
32 import socket
33 import errno
34
35 __all__ = ['FCGIApp']
36
37 # Constants from the spec.
38 FCGI_LISTENSOCK_FILENO = 0
39
40 FCGI_HEADER_LEN = 8
41
42 FCGI_VERSION_1 = 1
43
44 FCGI_BEGIN_REQUEST = 1
45 FCGI_ABORT_REQUEST = 2
46 FCGI_END_REQUEST = 3
47 FCGI_PARAMS = 4
48 FCGI_STDIN = 5
49 FCGI_STDOUT = 6
50 FCGI_STDERR = 7
51 FCGI_DATA = 8
52 FCGI_GET_VALUES = 9
53 FCGI_GET_VALUES_RESULT = 10
54 FCGI_UNKNOWN_TYPE = 11
55 FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
56
57 FCGI_NULL_REQUEST_ID = 0
58
59 FCGI_KEEP_CONN = 1
60
61 FCGI_RESPONDER = 1
62 FCGI_AUTHORIZER = 2
63 FCGI_FILTER = 3
64
65 FCGI_REQUEST_COMPLETE = 0
66 FCGI_CANT_MPX_CONN = 1
67 FCGI_OVERLOADED = 2
68 FCGI_UNKNOWN_ROLE = 3
69
70 FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
71 FCGI_MAX_REQS = 'FCGI_MAX_REQS'
72 FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
73
74 FCGI_Header = '!BBHHBx'
75 FCGI_BeginRequestBody = '!HB5x'
76 FCGI_EndRequestBody = '!LB3x'
77 FCGI_UnknownTypeBody = '!B7x'
78
79 FCGI_BeginRequestBody_LEN = struct.calcsize(FCGI_BeginRequestBody)
80 FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
81 FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
82
83 if __debug__:
84     import time
85
86     # Set non-zero to write debug output to a file.
87     DEBUG = 0
88     DEBUGLOG = '/tmp/fcgi_app.log'
89
90     def _debug(level, msg):
91         if DEBUG < level:
92             return
93
94         try:
95             f = open(DEBUGLOG, 'a')
96             f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
97             f.close()
98         except:
99             pass
100
101 def decode_pair(s, pos=0):
102     """
103     Decodes a name/value pair.
104
105     The number of bytes decoded as well as the name/value pair
106     are returned.
107     """
108     nameLength = ord(s[pos])
109     if nameLength & 128:
110         nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
111         pos += 4
112     else:
113         pos += 1
114
115     valueLength = ord(s[pos])
116     if valueLength & 128:
117         valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
118         pos += 4
119     else:
120         pos += 1
121
122     name = s[pos:pos+nameLength]
123     pos += nameLength
124     value = s[pos:pos+valueLength]
125     pos += valueLength
126
127     return (pos, (name, value))
128
129 def encode_pair(name, value):
130     """
131     Encodes a name/value pair.
132
133     The encoded string is returned.
134     """
135     nameLength = len(name)
136     if nameLength < 128:
137         s = chr(nameLength)
138     else:
139         s = struct.pack('!L', nameLength | 0x80000000L)
140
141     valueLength = len(value)
142     if valueLength < 128:
143         s += chr(valueLength)
144     else:
145         s += struct.pack('!L', valueLength | 0x80000000L)
146
147     return s + name + value
148
149 class Record(object):
150     """
151     A FastCGI Record.
152
153     Used for encoding/decoding records.
154     """
155     def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
156         self.version = FCGI_VERSION_1
157         self.type = type
158         self.requestId = requestId
159         self.contentLength = 0
160         self.paddingLength = 0
161         self.contentData = ''
162
163     def _recvall(sock, length):
164         """
165         Attempts to receive length bytes from a socket, blocking if necessary.
166         (Socket may be blocking or non-blocking.)
167         """
168         dataList = []
169         recvLen = 0
170         while length:
171             try:
172                 data = sock.recv(length)
173             except socket.error, e:
174                 if e[0] == errno.EAGAIN:
175                     select.select([sock], [], [])
176                     continue
177                 else:
178                     raise
179             if not data: # EOF
180                 break
181             dataList.append(data)
182             dataLen = len(data)
183             recvLen += dataLen
184             length -= dataLen
185         return ''.join(dataList), recvLen
186     _recvall = staticmethod(_recvall)
187
188     def read(self, sock):
189         """Read and decode a Record from a socket."""
190         try:
191             header, length = self._recvall(sock, FCGI_HEADER_LEN)
192         except:
193             raise EOFError
194
195         if length < FCGI_HEADER_LEN:
196             raise EOFError
197        
198         self.version, self.type, self.requestId, self.contentLength, \
199                       self.paddingLength = struct.unpack(FCGI_Header, header)
200
201         if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
202                              'contentLength = %d' %
203                              (sock.fileno(), self.type, self.requestId,
204                               self.contentLength))
205        
206         if self.contentLength:
207             try:
208                 self.contentData, length = self._recvall(sock,
209                                                          self.contentLength)
210             except:
211                 raise EOFError
212
213             if length < self.contentLength:
214                 raise EOFError
215
216         if self.paddingLength:
217             try:
218                 self._recvall(sock, self.paddingLength)
219             except:
220                 raise EOFError
221
222     def _sendall(sock, data):
223         """
224         Writes data to a socket and does not return until all the data is sent.
225         """
226         length = len(data)
227         while length:
228             try:
229                 sent = sock.send(data)
230             except socket.error, e:
231                 if e[0] == errno.EAGAIN:
232                     select.select([], [sock], [])
233                     continue
234                 else:
235                     raise
236             data = data[sent:]
237             length -= sent
238     _sendall = staticmethod(_sendall)
239
240     def write(self, sock):
241         """Encode and write a Record to a socket."""
242         self.paddingLength = -self.contentLength & 7
243
244         if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
245                              'contentLength = %d' %
246                              (sock.fileno(), self.type, self.requestId,
247                               self.contentLength))
248
249         header = struct.pack(FCGI_Header, self.version, self.type,
250                              self.requestId, self.contentLength,
251                              self.paddingLength)
252         self._sendall(sock, header)
253         if self.contentLength:
254             self._sendall(sock, self.contentData)
255         if self.paddingLength:
256             self._sendall(sock, '\x00'*self.paddingLength)
257
258 class FCGIApp(object):
259     def __init__(self, command=None, connect=None, host=None, port=None,
260                  filterEnviron=True):
261         if host is not None:
262             assert port is not None
263             connect=(host, port)
264
265         assert (command is not None and connect is None) or \
266                (command is None and connect is not None)
267
268         self._command = command
269         self._connect = connect
270
271         self._filterEnviron = filterEnviron
272        
273         #sock = self._getConnection()
274         #print self._fcgiGetValues(sock, ['FCGI_MAX_CONNS', 'FCGI_MAX_REQS', 'FCGI_MPXS_CONNS'])
275         #sock.close()
276        
277     def __call__(self, environ, start_response):
278         # For sanity's sake, we don't care about FCGI_MPXS_CONN
279         # (connection multiplexing). For every request, we obtain a new
280         # transport socket, perform the request, then discard the socket.
281         # This is, I believe, how mod_fastcgi does things...
282
283         sock = self._getConnection()
284
285         # Since this is going to be the only request on this connection,
286         # set the request ID to 1.
287         requestId = 1
288
289         # Begin the request
290         rec = Record(FCGI_BEGIN_REQUEST, requestId)
291         rec.contentData = struct.pack(FCGI_BeginRequestBody, FCGI_RESPONDER, 0)
292         rec.contentLength = FCGI_BeginRequestBody_LEN
293         rec.write(sock)
294
295         # Filter WSGI environ and send it as FCGI_PARAMS
296         if self._filterEnviron:
297             params = self._defaultFilterEnviron(environ)
298         else:
299             params = self._lightFilterEnviron(environ)
300         # TODO: Anything not from environ that needs to be sent also?
301         self._fcgiParams(sock, requestId, params)
302         self._fcgiParams(sock, requestId, {})
303
304         # Transfer wsgi.input to FCGI_STDIN
305         content_length = int(environ.get('CONTENT_LENGTH') or 0)
306         while True:
307             chunk_size = min(content_length, 4096)
308             s = environ['wsgi.input'].read(chunk_size)
309             content_length -= len(s)
310             rec = Record(FCGI_STDIN, requestId)
311             rec.contentData = s
312             rec.contentLength = len(s)
313             rec.write(sock)
314
315             if not s: break
316
317         # Empty FCGI_DATA stream
318         rec = Record(FCGI_DATA, requestId)
319         rec.write(sock)
320
321         # Main loop. Process FCGI_STDOUT, FCGI_STDERR, FCGI_END_REQUEST
322         # records from the application.
323         result = []
324         while True:
325             inrec = Record()
326             inrec.read(sock)
327             if inrec.type == FCGI_STDOUT:
328                 if inrec.contentData:
329                     result.append(inrec.contentData)
330                 else:
331                     # TODO: Should probably be pedantic and no longer
332                     # accept FCGI_STDOUT records?
333                     pass
334             elif inrec.type == FCGI_STDERR:
335                 # Simply forward to wsgi.errors
336                 environ['wsgi.errors'].write(inrec.contentData)
337             elif inrec.type == FCGI_END_REQUEST:
338                 # TODO: Process appStatus/protocolStatus fields?
339                 break
340
341         # Done with this transport socket, close it. (FCGI_KEEP_CONN was not
342         # set in the FCGI_BEGIN_REQUEST record we sent above. So the
343         # application is expected to do the same.)
344         sock.close()
345
346         result = ''.join(result)
347
348         # Parse response headers from FCGI_STDOUT
349         status = '200 OK'
350         headers = []
351         pos = 0
352         while True:
353             eolpos = result.find('\n', pos)
354             if eolpos < 0: break
355             line = result[pos:eolpos-1]
356             pos = eolpos + 1
357
358             # strip in case of CR. NB: This will also strip other
359             # whitespace...
360             line = line.strip()
361            
362             # Empty line signifies end of headers
363             if not line: break
364
365             # TODO: Better error handling
366             header, value = line.split(':', 1)
367             header = header.strip().lower()
368             value = value.strip()
369
370             if header == 'status':
371                 # Special handling of Status header
372                 status = value
373                 if status.find(' ') < 0:
374                     # Append a dummy reason phrase if one was not provided
375                     status += ' FCGIApp'
376             else:
377                 headers.append((header, value))
378
379         result = result[pos:]
380
381         # Set WSGI status, headers, and return result.
382         start_response(status, headers)
383         return [result]
384
385     def _getConnection(self):
386         if self._connect is not None:
387             # The simple case. Create a socket and connect to the
388             # application.
389             if type(self._connect) is str:
390                 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
391             else:
392                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
393             sock.connect(self._connect)
394             return sock
395
396         # To be done when I have more time...
397         raise NotImplementedError, 'Launching and managing FastCGI programs not yet implemented'
398    
399     def _fcgiGetValues(self, sock, vars):
400         # Construct FCGI_GET_VALUES record
401         outrec = Record(FCGI_GET_VALUES)
402         data = []
403         for name in vars:
404             data.append(encode_pair(name, ''))
405         data = ''.join(data)
406         outrec.contentData = data
407         outrec.contentLength = len(data)
408         outrec.write(sock)
409
410         # Await response
411         inrec = Record()
412         inrec.read(sock)
413         result = {}
414         if inrec.type == FCGI_GET_VALUES_RESULT:
415             pos = 0
416             while pos < inrec.contentLength:
417                 pos, (name, value) = decode_pair(inrec.contentData, pos)
418                 result[name] = value
419         return result
420
421     def _fcgiParams(self, sock, requestId, params):
422         rec = Record(FCGI_PARAMS, requestId)
423         data = []
424         for name,value in params.items():
425             data.append(encode_pair(name, value))
426         data = ''.join(data)
427         rec.contentData = data
428         rec.contentLength = len(data)
429         rec.write(sock)
430
431     _environPrefixes = ['SERVER_', 'HTTP_', 'REQUEST_', 'REMOTE_', 'PATH_',
432                         'CONTENT_']
433     _environCopies = ['SCRIPT_NAME', 'QUERY_STRING', 'AUTH_TYPE']
434     _environRenames = {}
435
436     def _defaultFilterEnviron(self, environ):
437         result = {}
438         for n in environ.keys():
439             for p in self._environPrefixes:
440                 if n.startswith(p):
441                     result[n] = environ[n]
442             if n in self._environCopies:
443                 result[n] = environ[n]
444             if n in self._environRenames:
445                 result[self._environRenames[n]] = environ[n]
446                
447         return result
448
449     def _lightFilterEnviron(self, environ):
450         result = {}
451         for n in environ.keys():
452             if n.upper() == n:
453                 result[n] = environ[n]
454         return result
455
456 if __name__ == '__main__':
457     from flup.server.ajp import WSGIServer
458     app = FCGIApp(connect=('localhost', 4242))
459     #import paste.lint
460     #app = paste.lint.middleware(app)
461     WSGIServer(app).run()
Note: See TracBrowser for help on using the browser.