Changeset 2:64639f1c3e8c

Show
Ignore:
Timestamp:
04/14/05 19:55:03 (3 years ago)
Author:
Allan Saddi <allan@saddi.com>
branch:
default
convert_revision:
svn:46762da8-4eb7-0310-94e9-d918b60927c8/flup/trunk@1754
Message:

Checkpoint commit.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • flup/server/ajp.py

    r1 r2  
    8080__version__ = '$Revision$' 
    8181 
    82 import sys 
    8382import socket 
    84 import select 
    85 import struct 
    86 import signal 
    8783import logging 
    88 import errno 
    89 import datetime 
    90 import time 
    9184 
    92 # Unfortunately, for now, threads are required. 
    93 import thread 
    94 import threading 
     85from ajp_base import BaseAJPServer, Connection 
     86from threadedserver import ThreadedServer 
    9587 
    9688__all__ = ['WSGIServer'] 
    9789 
    98 # Packet header prefixes. 
    99 SERVER_PREFIX = '\x12\x34' 
    100 CONTAINER_PREFIX = 'AB' 
    101  
    102 # Server packet types. 
    103 PKTTYPE_FWD_REQ = '\x02' 
    104 PKTTYPE_SHUTDOWN = '\x07' 
    105 PKTTYPE_PING = '\x08' 
    106 PKTTYPE_CPING = '\x0a' 
    107  
    108 # Container packet types. 
    109 PKTTYPE_SEND_BODY = '\x03' 
    110 PKTTYPE_SEND_HEADERS = '\x04' 
    111 PKTTYPE_END_RESPONSE = '\x05' 
    112 PKTTYPE_GET_BODY = '\x06' 
    113 PKTTYPE_CPONG = '\x09' 
    114  
    115 # Code tables for methods/headers/attributes. 
    116 methodTable = [ 
    117     None, 
    118     'OPTIONS', 
    119     'GET', 
    120     'HEAD', 
    121     'POST', 
    122     'PUT', 
    123     'DELETE', 
    124     'TRACE', 
    125     'PROPFIND', 
    126     'PROPPATCH', 
    127     'MKCOL', 
    128     'COPY', 
    129     'MOVE', 
    130     'LOCK', 
    131     'UNLOCK', 
    132     'ACL', 
    133     'REPORT', 
    134     'VERSION-CONTROL', 
    135     'CHECKIN', 
    136     'CHECKOUT', 
    137     'UNCHECKOUT', 
    138     'SEARCH', 
    139     'MKWORKSPACE', 
    140     'UPDATE', 
    141     'LABEL', 
    142     'MERGE', 
    143     'BASELINE_CONTROL', 
    144     'MKACTIVITY' 
    145     ] 
    146  
    147 requestHeaderTable = [ 
    148     None, 
    149     'Accept', 
    150     'Accept-Charset', 
    151     'Accept-Encoding', 
    152     'Accept-Language', 
    153     'Authorization', 
    154     'Connection', 
    155     'Content-Type', 
    156     'Content-Length', 
    157     'Cookie', 
    158     'Cookie2', 
    159     'Host', 
    160     'Pragma', 
    161     'Referer', 
    162     'User-Agent' 
    163     ] 
    164  
    165 attributeTable = [ 
    166     None, 
    167     'CONTEXT', 
    168     'SERVLET_PATH', 
    169     'REMOTE_USER', 
    170     'AUTH_TYPE', 
    171     'QUERY_STRING', 
    172     'JVM_ROUTE', 
    173     'SSL_CERT', 
    174     'SSL_CIPHER', 
    175     'SSL_SESSION', 
    176     None, # name follows 
    177     'SSL_KEY_SIZE' 
    178     ] 
    179  
    180 responseHeaderTable = [ 
    181     None, 
    182     'content-type', 
    183     'content-language', 
    184     'content-length', 
    185     'date', 
    186     'last-modified', 
    187     'location', 
    188     'set-cookie', 
    189     'set-cookie2', 
    190     'servlet-engine', 
    191     'status', 
    192     'www-authenticate' 
    193     ] 
    194  
    195 # The main classes use this name for logging. 
    196 LoggerName = 'ajp-wsgi' 
    197  
    198 # Set up module-level logger. 
    199 console = logging.StreamHandler() 
    200 console.setLevel(logging.DEBUG) 
    201 console.setFormatter(logging.Formatter('%(asctime)s : %(message)s', 
    202                                        '%Y-%m-%d %H:%M:%S')) 
    203 logging.getLogger(LoggerName).addHandler(console) 
    204 del console 
    205  
    206 class ProtocolError(Exception): 
    207     """ 
    208     Exception raised when the server does something unexpected or 
    209     sends garbled data. Usually leads to a Connection closing. 
    210     """ 
    211     pass 
    212  
    213 def decodeString(data, pos=0): 
    214     """Decode a string.""" 
    215     try: 
    216         length = struct.unpack('>H', data[pos:pos+2])[0] 
    217         pos += 2 
    218         if length == 0xffff: # This was undocumented! 
    219             return '', pos 
    220         s = data[pos:pos+length] 
    221         return s, pos+length+1 # Don't forget NUL 
    222     except Exception, e: 
    223         raise ProtocolError, 'decodeString: '+str(e) 
    224  
    225 def decodeRequestHeader(data, pos=0): 
    226     """Decode a request header/value pair.""" 
    227     try: 
    228         if data[pos] == '\xa0': 
    229             # Use table 
    230             i = ord(data[pos+1]) 
    231             name = requestHeaderTable[i] 
    232             if name is None: 
    233                 raise ValueError, 'bad request header code' 
    234             pos += 2 
    235         else: 
    236             name, pos = decodeString(data, pos) 
    237         value, pos = decodeString(data, pos) 
    238         return name, value, pos 
    239     except Exception, e: 
    240         raise ProtocolError, 'decodeRequestHeader: '+str(e) 
    241  
    242 def decodeAttribute(data, pos=0): 
    243     """Decode a request attribute.""" 
    244     try: 
    245         i = ord(data[pos]) 
    246         pos += 1 
    247         if i == 0xff: 
    248             # end 
    249             return None, None, pos 
    250         elif i == 0x0a: 
    251             # name follows 
    252             name, pos = decodeString(data, pos) 
    253         elif i == 0x0b: 
    254             # Special handling of SSL_KEY_SIZE. 
    255             name = attributeTable[i] 
    256             # Value is an int, not a string. 
    257             value = struct.unpack('>H', data[pos:pos+2])[0] 
    258             return name, str(value), pos+2 
    259         else: 
    260             name = attributeTable[i] 
    261             if name is None: 
    262                 raise ValueError, 'bad attribute code' 
    263         value, pos = decodeString(data, pos) 
    264         return name, value, pos 
    265     except Exception, e: 
    266         raise ProtocolError, 'decodeAttribute: '+str(e) 
    267  
    268 def encodeString(s): 
    269     """Encode a string.""" 
    270     return struct.pack('>H', len(s)) + s + '\x00' 
    271  
    272 def encodeResponseHeader(name, value): 
    273     """Encode a response header/value pair.""" 
    274     lname = name.lower() 
    275     if lname in responseHeaderTable: 
    276         # Use table 
    277         i = responseHeaderTable.index(lname) 
    278         out = '\xa0' + chr(i) 
    279     else: 
    280         out = encodeString(name) 
    281     out += encodeString(value) 
    282     return out 
    283  
    284 class Packet(object): 
    285     """An AJP message packet.""" 
    286     def __init__(self): 
    287         self.data = '' 
    288         # Don't set this on write, it will be calculated automatically. 
    289         self.length = 0 
    290  
    291     def _recvall(sock, length): 
    292         """ 
    293         Attempts to receive length bytes from a socket, blocking if necessary. 
    294         (Socket may be blocking or non-blocking.) 
    295         """ 
    296         dataList = [] 
    297         recvLen = 0 
    298         while length: 
    299             try: 
    300                 data = sock.recv(length) 
    301             except socket.error, e: 
    302                 if e[0] == errno.EAGAIN: 
    303                     select.select([sock], [], []) 
    304                     continue 
    305                 else: 
    306                     raise 
    307             if not data: # EOF 
    308                 break 
    309             dataList.append(data) 
    310             dataLen = len(data) 
    311             recvLen += dataLen 
    312             length -= dataLen 
    313         return ''.join(dataList), recvLen 
    314     _recvall = staticmethod(_recvall) 
    315  
    316     def read(self, sock): 
    317         """Attempt to read a packet from the server.""" 
    318         try: 
    319             header, length = self._recvall(sock, 4) 
    320         except socket.error: 
    321             # Treat any sort of socket errors as EOF (close Connection). 
    322             raise EOFError 
    323  
    324         if length < 4: 
    325             raise EOFError 
    326  
    327         if header[:2] != SERVER_PREFIX: 
    328             raise ProtocolError, 'invalid header' 
    329  
    330         self.length = struct.unpack('>H', header[2:4])[0] 
    331         if self.length: 
    332             try: 
    333                 self.data, length = self._recvall(sock, self.length) 
    334             except socket.error: 
    335                 raise EOFError 
    336  
    337             if length < self.length: 
    338                 raise EOFError 
    339  
    340     def _sendall(sock, data): 
    341         """ 
    342         Writes data to a socket and does not return until all the data is sent. 
    343         """ 
    344         length = len(data) 
    345         while length: 
    346             try: 
    347                 sent = sock.send(data) 
    348             except socket.error, e: 
    349                 if e[0] == errno.EPIPE: 
    350                     return # Don't bother raising an exception. Just ignore. 
    351                 elif e[0] == errno.EAGAIN: 
    352                     select.select([], [sock], []) 
    353                     continue 
    354                 else: 
    355                     raise 
    356             data = data[sent:] 
    357             length -= sent 
    358     _sendall = staticmethod(_sendall) 
    359  
    360     def write(self, sock): 
    361         """Send a packet to the server.""" 
    362         self.length = len(self.data) 
    363         self._sendall(sock, CONTAINER_PREFIX + struct.pack('>H', self.length)) 
    364         if self.length: 
    365             self._sendall(sock, self.data) 
    366  
    367 class InputStream(object): 
    368     """ 
    369     File-like object that represents the request body (if any). Supports 
    370     the bare mininum methods required by the WSGI spec. Thanks to 
    371     StringIO for ideas. 
    372     """ 
    373     def __init__(self, conn): 
    374         self._conn = conn 
    375  
    376         # See WSGIServer. 
    377         self._shrinkThreshold = conn.server.inputStreamShrinkThreshold 
    378  
    379         self._buf = '' 
    380         self._bufList = [] 
    381         self._pos = 0 # Current read position. 
    382         self._avail = 0 # Number of bytes currently available. 
    383         self._length = 0 # Set to Content-Length in request. 
    384  
    385         self.logger = logging.getLogger(LoggerName) 
    386  
    387     def bytesAvailForAdd(self): 
    388         return self._length - self._avail 
    389  
    390     def _shrinkBuffer(self): 
    391         """Gets rid of already read data (since we can't rewind).""" 
    392         if self._pos >= self._shrinkThreshold: 
    393             self._buf = self._buf[self._pos:] 
    394             self._avail -= self._pos 
    395             self._length -= self._pos 
    396             self._pos = 0 
    397  
    398             assert self._avail >= 0 and self._length >= 0 
    399  
    400     def _waitForData(self): 
    401         toAdd = min(self.bytesAvailForAdd(), 0xffff) 
    402         assert toAdd > 0 
    403         pkt = Packet() 
    404         pkt.data = PKTTYPE_GET_BODY + \ 
    405                    struct.pack('>H', toAdd) 
    406         self._conn.writePacket(pkt) 
    407         self._conn.processInput() 
    408  
    409     def read(self, n=-1): 
    410         if self._pos == self._length: 
    411             return '' 
    412         while True: 
    413             if n < 0 or (self._avail - self._pos) < n: 
    414                 # Not enough data available. 
    415                 if not self.bytesAvailForAdd(): 
    416                     # And there's no more coming. 
    417                     newPos = self._avail 
    418                     break 
    419                 else: 
    420                     # Ask for more data and wait. 
    421                     self._waitForData() 
    422                     continue 
    423             else: 
    424                 newPos = self._pos + n 
    425                 break 
    426         # Merge buffer list, if necessary. 
    427         if self._bufList: 
    428             self._buf += ''.join(self._bufList) 
    429             self._bufList = [] 
    430         r = self._buf[self._pos:newPos] 
    431         self._pos = newPos 
    432         self._shrinkBuffer() 
    433         return r 
    434  
    435     def readline(self, length=None): 
    436         if self._pos == self._length: 
    437             return '' 
    438         while True: 
    439             # Unfortunately, we need to merge the buffer list early. 
    440             if self._bufList: 
    441                 self._buf += ''.join(self._bufList) 
    442                 self._bufList = [] 
    443             # Find newline. 
    444             i = self._buf.find('\n', self._pos) 
    445             if i < 0: 
    446                 # Not found? 
    447                 if not self.bytesAvailForAdd(): 
    448                     # No more data coming. 
    449                     newPos = self._avail 
    450                     break 
    451                 else: 
    452                     # Wait for more to come. 
    453                     self._waitForData() 
    454                     continue 
    455             else: 
    456                 newPos = i + 1 
    457                 break 
    458         if length is not None: 
    459             if self._pos + length < newPos: 
    460                 newPos = self._pos + length 
    461         r = self._buf[self._pos:newPos] 
    462         self._pos = newPos 
    463         self._shrinkBuffer() 
    464         return r 
    465  
    466     def readlines(self, sizehint=0): 
    467         total = 0 
    468         lines = [] 
    469         line = self.readline() 
    470         while line: 
    471             lines.append(line) 
    472             total += len(line) 
    473             if 0 < sizehint <= total: 
    474                 break 
    475             line = self.readline() 
    476         return lines 
    477  
    478     def __iter__(self): 
    479         return self 
    480  
    481     def next(self): 
    482         r = self.readline() 
    483         if not r: 
    484             raise StopIteration 
    485         return r 
    486  
    487     def setDataLength(self, length): 
    488         """ 
    489         Once Content-Length is known, Request calls this method to set it. 
    490         """ 
    491         self._length = length 
    492  
    493     def addData(self, data): 
    494         """ 
    495         Adds data from the server to this InputStream. Note that we never ask 
    496         the server for data beyond the Content-Length, so the server should 
    497         never send us an EOF (empty string argument). 
    498         """ 
    499         if not data: 
    500             raise ProtocolError, 'short data' 
    501         self._bufList.append(data) 
    502         length = len(data) 
    503         self._avail += length 
    504         if self._avail > self._length: 
    505             raise ProtocolError, 'too much data' 
    506  
    507 class Request(object): 
    508     """ 
    509     A Request object. A more fitting name would probably be Transaction, but 
    510     it's named Request to mirror my FastCGI driver. :) This object 
    511     encapsulates all the data about the HTTP request and allows the handler 
    512     to send a response. 
    513  
    514     The only attributes/methods that the handler should concern itself 
    515     with are: environ, input, startResponse(), and write(). 
    516     """ 
    517     # Do not ever change the following value. 
    518     _maxWrite = 8192 - 4 - 3 # 8k - pkt header - send body header 
    519  
    520     def __init__(self, conn): 
    521         self._conn = conn 
    522  
    523         self.environ = { 
    524             'SCRIPT_NAME': conn.server.scriptName 
    525             } 
    526         self.input = InputStream(conn) 
    527  
    528         self._headersSent = False 
    529  
    530         self.logger = logging.getLogger(LoggerName) 
    531  
    532     def run(self): 
    533         self.logger.info('%s %s', 
    534                          self.environ['REQUEST_METHOD'], 
    535                          self.environ['REQUEST_URI']) 
    536  
    537         start = datetime.datetime.now() 
    538  
    539         try: 
    540             self._conn.server.handler(self) 
    541         except: 
    542             self.logger.exception('Exception caught from handler') 
    543             if not self._headersSent: 
    544                 self._conn.server.error(self) 
    545  
    546         end = datetime.datetime.now() 
    547  
    548         # Notify server of end of response (reuse flag is set to true). 
    549         pkt = Packet() 
    550         pkt.data = PKTTYPE_END_RESPONSE + '\x01' 
    551         self._conn.writePacket(pkt) 
    552  
    553         handlerTime = end - start 
    554         self.logger.debug('%s %s done (%.3f secs)', 
    555                           self.environ['REQUEST_METHOD'], 
    556                           self.environ['REQUEST_URI'], 
    557                           handlerTime.seconds + 
    558                           handlerTime.microseconds / 1000000.0) 
    559  
    560     # The following methods are called from the Connection to set up this 
    561     # Request. 
    562  
    563     def setMethod(self, value): 
    564         self.environ['REQUEST_METHOD'] = value 
    565  
    566     def setProtocol(self, value): 
    567         self.environ['SERVER_PROTOCOL'] = value 
    568  
    569     def setRequestURI(self, value): 
    570         self.environ['REQUEST_URI'] = value 
    571  
    572         scriptName = self._conn.server.scriptName 
    573         if not value.startswith(scriptName): 
    574             self.logger.warning('scriptName does not match request URI') 
    575  
    576         self.environ['PATH_INFO'] = value[len(scriptName):] 
    577  
    578     def setRemoteAddr(self, value): 
    579         self.environ['REMOTE_ADDR'] = value 
    580  
    581     def setRemoteHost(self, value): 
    582         self.environ['REMOTE_HOST'] = value 
    583  
    584     def setServerName(self, value): 
    585         self.environ['SERVER_NAME'] = value 
    586  
    587     def setServerPort(self, value): 
    588         self.environ['SERVER_PORT'] = str(value) 
    589  
    590     def setIsSSL(self, value): 
    591         if value: 
    592             self.environ['HTTPS'] = 'on' 
    593  
    594     def addHeader(self, name, value): 
    595         name = name.replace('-', '_').upper() 
    596         if name in ('CONTENT_TYPE', 'CONTENT_LENGTH'): 
    597             self.environ[name] = value 
    598             if name == 'CONTENT_LENGTH': 
    599                 length = int(value) 
    600                 self.input.setDataLength(length) 
    601         else: 
    602             self.environ['HTTP_'+name] = value 
    603  
    604     def addAttribute(self, name, value): 
    605         self.environ[name] = value 
    606  
    607     # The only two methods that should be called from the handler. 
    608  
    609     def startResponse(self, statusCode, statusMsg, headers): 
    610         """ 
    611         Begin the HTTP response. This must only be called once and it 
    612         must be called before any calls to write(). 
    613  
    614         statusCode is the integer status code (e.g. 200). statusMsg 
    615         is the associated reason message (e.g.'OK'). headers is a list 
    616         of 2-tuples - header name/value pairs. (Both header name and value 
    617         must be strings.) 
    618         """ 
    619         assert not self._headersSent, 'Headers already sent!' 
    620  
    621         pkt = Packet() 
    622         pkt.data = PKTTYPE_SEND_HEADERS + \ 
    623                    struct.pack('>H', statusCode) + \ 
    624                    encodeString(statusMsg) + \ 
    625                    struct.pack('>H', len(headers)) + \ 
    626                    ''.join([encodeResponseHeader(name, value) 
    627                             for name,value in headers]) 
    628  
    629         self._conn.writePacket(pkt) 
    630  
    631         self._headersSent = True 
    632  
    633     def write(self, data): 
    634         """ 
    635         Write data (which comprises the response body). Note that due to 
    636         restrictions on AJP packet size, we limit our writes to 8185 bytes 
    637         each packet. 
    638         """ 
    639         assert self._headersSent, 'Headers must be sent first!' 
    640  
    641         bytesLeft = len(data) 
    642         while bytesLeft: 
    643             toWrite = min(bytesLeft, self._maxWrite) 
    644  
    645             pkt = Packet() 
    646             pkt.data = PKTTYPE_SEND_BODY + \ 
    647                        struct.pack('>H', toWrite) + \ 
    648                        data[:toWrite] 
    649             self._conn.writePacket(pkt) 
    650  
    651             data = data[toWrite:] 
    652             bytesLeft -= toWrite 
    653  
    654 class Connection(object): 
    655     """ 
    656     A single Connection with the server. Requests are not multiplexed over the 
    657     same connection, so at any given time, the Connection is either 
    658     waiting for a request, or processing a single request. 
    659     """ 
    660     def __init__(self, sock, addr, server): 
    661         self.server = server 
    662         self._sock = sock 
    663         self._addr = addr 
    664  
    665         self._request = None 
    666  
    667         self.logger = logging.getLogger(LoggerName) 
    668  
    669     def run(self): 
    670         self.logger.debug('Connection starting up (%s:%d)', 
    671                           self._addr[0], self._addr[1]) 
    672  
    673         # Main loop. Errors will cause the loop to be exited and 
    674         # the socket to be closed. 
    675         while True: 
    676             try: 
    677                 self.processInput() 
    678             except ProtocolError, e: 
    679                 self.logger.error("Protocol error '%s'", str(e)) 
    680                 break 
    681             except EOFError: 
    682                 break 
    683             except: 
    684                 self.logger.exception('Exception caught in Connection') 
    685                 break 
    686  
    687         self.logger.debug('Connection shutting down (%s:%d)', 
    688                           self._addr[0], self._addr[1]) 
    689  
    690         self._sock.close() 
    691  
    692     def processInput(self): 
    693         """Wait for and process a single packet.""" 
    694         pkt = Packet() 
    695         select.select([self._sock], [], []) 
    696         pkt.read(self._sock) 
    697  
    698         # Body chunks have no packet type code. 
    699         if self._request is not None: 
    700             self._processBody(pkt) 
    701             return 
    702  
    703         if not pkt.length: 
    704             raise ProtocolError, 'unexpected empty packet' 
    705  
    706         pkttype = pkt.data[0] 
    707         if pkttype == PKTTYPE_FWD_REQ: 
    708             self._forwardRequest(pkt) 
    709         elif pkttype == PKTTYPE_SHUTDOWN: 
    710             self._shutdown(pkt) 
    711         elif pkttype == PKTTYPE_PING: 
    712             self._ping(pkt) 
    713         elif pkttype == PKTTYPE_CPING: 
    714             self._cping(pkt) 
    715         else: 
    716             raise ProtocolError, 'unknown packet type' 
    717  
    718     def _forwardRequest(self, pkt): 
    719         """ 
    720         Creates a Request object, fills it in from the packet, then runs it. 
    721         """ 
    722         assert self._request is None 
    723  
    724         req = self.server.requestClass(self) 
    725         i = ord(pkt.data[1]) 
    726         method = methodTable[i] 
    727         if method is None: 
    728             raise ValueError, 'bad method field' 
    729         req.setMethod(method) 
    730         value, pos = decodeString(pkt.data, 2) 
    731         req.setProtocol(value) 
    732         value, pos = decodeString(pkt.data, pos) 
    733         req.setRequestURI(value) 
    734         value, pos = decodeString(pkt.data, pos) 
    735         req.setRemoteAddr(value) 
    736         value, pos = decodeString(pkt.data, pos) 
    737         req.setRemoteHost(value) 
    738         value, pos = decodeString(pkt.data, pos) 
    739         req.setServerName(value) 
    740         value = struct.unpack('>H', pkt.data[pos:pos+2])[0] 
    741         req.setServerPort(value) 
    742         i = ord(pkt.data[pos+2]) 
    743         req.setIsSSL(i != 0) 
    744  
    745         # Request headers. 
    746         numHeaders = struct.unpack('>H', pkt.data[pos+3:pos+5])[0] 
    747         pos += 5 
    748         for i in range(numHeaders): 
    749             name, value, pos = decodeRequestHeader(pkt.data, pos) 
    750             req.addHeader(name, value) 
    751  
    752         # Attributes. 
    753         while True: 
    754             name, value, pos = decodeAttribute(pkt.data, pos) 
    755             if name is None: 
    756                 break 
    757             req.addAttribute(name, value) 
    758  
    759         self._request = req 
    760  
    761         # Read first body chunk, if needed. 
    762         if req.input.bytesAvailForAdd(): 
    763             self.processInput() 
    764  
    765         # Run Request. 
    766         req.run() 
    767  
    768         self._request = None 
    769  
    770     def _shutdown(self, pkt): 
    771         """Not sure what to do with this yet.""" 
    772         self.logger.info('Received shutdown request from server') 
    773  
    774     def _ping(self, pkt): 
    775         """I have no idea what this packet means.""" 
    776         self.logger.debug('Received ping') 
    777  
    778     def _cping(self, pkt): 
    779         """Respond to a PING (CPING) packet.""" 
    780         self.logger.debug('Received PING, sending PONG') 
    781         pkt = Packet() 
    782         pkt.data = PKTTYPE_CPONG 
    783         self.writePacket(pkt) 
    784  
    785     def _processBody(self, pkt): 
    786         """ 
    787         Handles a body chunk from the server by appending it to the 
    788         InputStream. 
    789         """ 
    790         if pkt.length: 
    791             length = struct.unpack('>H', pkt.data[:2])[0] 
    792             self._request.input.addData(pkt.data[2:2+length]) 
    793         else: 
    794             # Shouldn't really ever get here. 
    795             self._request.input.addData('') 
    796  
    797     def writePacket(self, pkt): 
    798         """Sends a Packet to the server.""" 
    799         pkt.write(self._sock) 
    800  
    801 class ThreadPool(object): 
    802     """ 
    803     Thread pool that maintains the number of idle threads between 
    804     minSpare and maxSpare inclusive. By default, there is no limit on 
    805     the number of threads that can be started, but this can be controlled 
    806     by maxThreads. 
    807     """ 
    808     def __init__(self, minSpare=1, maxSpare=5, maxThreads=sys.maxint): 
    809         self._minSpare = minSpare 
    810         self._maxSpare = maxSpare 
    811         self._maxThreads = max(minSpare, maxThreads) 
    812  
    813         self._lock = threading.Condition() 
    814         self._workQueue = [] 
    815         self._idleCount = self._workerCount = maxSpare 
    816  
    817         # Start the minimum number of worker threads. 
    818         for i in range(maxSpare): 
    819             thread.start_new_thread(self._worker, ()) 
    820  
    821     def addJob(self, job, allowQueuing=True): 
    822         """ 
    823         Adds a job to the work queue. The job object should have a run() 
    824         method. If allowQueuing is True (the default), the job will be 
    825         added to the work queue regardless if there are any idle threads 
    826         ready. (The only way for there to be no idle threads is if maxThreads 
    827         is some reasonable, finite limit.) 
    828  
    829         Otherwise, if allowQueuing is False, and there are no more idle 
    830         threads, the job will not be queued. 
    831  
    832         Returns True if the job was queued, False otherwise. 
    833         """ 
    834         self._lock.acquire() 
    835         try: 
    836             # Maintain minimum number of spares. 
    837             while self._idleCount < self._minSpare and \ 
    838                   self._workerCount < self._maxThreads: 
    839                 self._workerCount += 1 
    840                 self._idleCount += 1 
    841                 thread.start_new_thread(self._worker, ()) 
    842  
    843             # Hand off the job. 
    844             if self._idleCount or allowQueuing: 
    845                 self._workQueue.append(job) 
    846                 self._lock.notify() 
    847                 return True 
    848             else: 
    849                 return False 
    850         finally: 
    851             self._lock.release() 
    852  
    853     def _worker(self): 
    854         """ 
    855         Worker thread routine. Waits for a job, executes it, repeat. 
    856         """ 
    857         self._lock.acquire() 
    858         while True: 
    859             while not self._workQueue: 
    860                 self._lock.wait() 
    861  
    862             # We have a job to do... 
    863             job = self._workQueue.pop(0) 
    864  
    865             assert self._idleCount > 0 
    866             self._idleCount -= 1 
    867  
    868             self._lock.release() 
    869  
    870             job.run() 
    871  
    872             self._lock.acquire() 
    873  
    874             if self._idleCount == self._maxSpare: 
    875                 break # NB: lock still held 
    876             self._idleCount += 1 
    877             assert self._idleCount <= self._maxSpare 
    878  
    879         # Die off... 
    880         assert self._workerCount > self._maxSpare 
    881         self._workerCount -= 1 
    882  
    883         self._lock.release() 
    884  
    885 class WSGIServer(object): 
     90class WSGIServer(BaseAJPServer, ThreadedServer): 
    88691    """ 
    88792    AJP1.3/WSGI server. Runs your WSGI application as a persistant program 
     
    896101    mod_jk) - see <http://jakarta.apache.org/tomcat/connectors-doc/>. 
    897102    """ 
    898     # What Request class to use. 
    899     requestClass = Request 
    900  
    901     # Limits the size of the InputStream's string buffer to this size + 8k. 
    902     # Since the InputStream is not seekable, we throw away already-read 
    903     # data once this certain amount has been read. (The 8k is there because 
    904     # it is the maximum size of new data added per chunk.) 
    905     inputStreamShrinkThreshold = 102400 - 8192 
    906  
    907103    def __init__(self, application, scriptName='', environ=None, 
    908104                 multithreaded=True, 
     
    918114        environment variables you want to pass to your application. 
    919115 
    920         Set multithreaded to False if your application is not thread-safe. 
    921  
    922116        bindAddress is the address to bind to, which must be a tuple of 
    923117        length 2. The first element is a string, which is the host name 
     
    930124 
    931125        loggingLevel sets the logging level of the module-level logger. 
     126        """ 
     127        BaseAJPServer.__init__(self, application, 
     128                               scriptName=scriptName, 
     129                               environ=environ, 
     130                               multithreaded=multithreaded, 
     131                               bindAddress=bindAddress, 
     132                               allowedServers=allowedServers, 
     133                               loggingLevel=loggingLevel) 
     134        for key in ('jobClass', 'jobArgs'): 
     135            if kw.has_key(key): 
     136                del kw[key] 
     137        ThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,), 
     138                                **kw) 
    932139 
    933         Any additional keyword arguments are passed to the underlying 
    934         ThreadPool. 
    935         """ 
    936         if environ is None: 
    937             environ = {} 
    938  
    939         self.application = application 
    940         self.scriptName = scriptName 
    941         self.environ = environ 
    942         self.multithreaded = multithreaded 
    943         self._bindAddress = bindAddress 
    944         self._allowedServers = allowedServers 
    945  
    946         # Used to force single-threadedness. 
    947         self._appLock = thread.allocate_lock() 
    948  
    949         self._threadPool = ThreadPool(**kw) 
    950  
    951         self.logger = logging.getLogger(LoggerName) 
    952         self.logger.setLevel(loggingLevel) 
    953  
    954     def _setupSocket(self): 
    955         """Creates and binds the socket for communication with the server.""" 
    956         sock = socket.socket() 
    957         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
    958         sock.bind(self._bindAddress) 
    959         sock.listen(socket.SOMAXCONN) 
    960         return sock 
    961  
    962     def _cleanupSocket(self, sock): 
    963         """Closes the main socket.""" 
    964         sock.close() 
    965  
    966     def _isServerAllowed(self, addr): 
    967         return self._allowedServers is None or \ 
    968                addr[0] in self._allowedServers 
    969  
    970     def _installSignalHandlers(self): 
    971         self._oldSIGs = [(x,signal.getsignal(x)) for x in 
    972                          (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)] 
    973         signal.signal(signal.SIGHUP, self._hupHandler) 
    974         signal.signal(signal.SIGINT, self._intHandler) 
    975         signal.signal(signal.SIGTERM, self._intHandler) 
    976  
    977     def _restoreSignalHandlers(self): 
    978         for signum,handler in self._oldSIGs: 
    979             signal.signal(signum, handler) 
    980          
    981     def _hupHandler(self, signum, frame): 
    982         self._hupReceived = True 
    983         self._keepGoing = False 
    984  
    985     def _intHandler(self, signum, frame): 
    986         self._keepGoing = False 
    987  
    988     def run(self, timeout=1.0): 
     140    def run(self): 
    989141        """ 
    990142        Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT, 
    991         SIGTERM cause it to cleanup and return. (If a SIGHUP is caught, this 
    992         method returns True. Returns False otherwise.) 
     143        SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP 
     144        is caught, this method returns True. Returns False otherwise.) 
    993145        """ 
    994146        self.logger.info('%s starting up', self.__class__.__name__) 
     
    1000152            return False 
    1001153 
    1002         self._keepGoing = True 
    1003         self._hupReceived = False 
    1004  
    1005         # Install signal handlers. 
    1006         self._installSignalHandlers() 
    1007  
    1008         while self._keepGoing: 
    1009             try: 
    1010                 r, w, e = select.select([sock], [], [], timeout) 
    1011             except select.error, e: 
    1012                 if e[0] == errno.EINTR: 
    1013                     continue 
    1014                 raise 
    1015  
    1016             if r: 
    1017                 try: 
    1018                     clientSock, addr = sock.accept() 
    1019                 except socket.error, e: 
    1020                     if e[0] in (errno.EINTR, errno.EAGAIN): 
    1021                         continue 
    1022                     raise 
    1023  
    1024                 if not self._isServerAllowed(addr): 
    1025                     self.logger.warning('Server connection from %s disallowed', 
    1026                                         addr[0]) 
    1027                     clientSock.close() 
    1028                     continue 
    1029  
    1030                 # Hand off to Connection. 
    1031                 conn = Connection(clientSock, addr, self) 
    1032                 if not self._threadPool.addJob(conn, allowQueuing=False): 
    1033                     # No thread left, immediately close the socket to hopefully 
    1034                     # indicate to the web server that we're at our limit... 
    1035                     # and to prevent having too many opened (and useless) 
    1036                     # files. 
    1037                     clientSock.close() 
    1038  
    1039             self._mainloopPeriodic() 
    1040  
    1041         # Restore old signal handlers. 
    1042         self._restoreSignalHandlers() 
     154        ret = ThreadedServer.run(self, sock) 
    1043155 
    1044156        self._cleanupSocket(sock) 
     
    1047159                         self._hupReceived and ' (reload requested)' or '') 
    1048160 
    1049         return self._hupReceived 
    1050  
    1051     def _mainloopPeriodic(self): 
    1052         """ 
    1053         Called with just about each iteration of the main loop. Meant to 
    1054         be overridden. 
    1055         """ 
    1056         pass 
    1057  
    1058     def _exit(self, reload=False): 
    1059         """ 
    1060         Protected convenience method for subclasses to force an exit. Not 
    1061         really thread-safe, which is why it isn't public. 
    1062         """ 
    1063         if self._keepGoing: 
    1064             self._keepGoing = False 
    1065             self._hupReceived = reload 
    1066  
    1067     def handler(self, request): 
    1068         """ 
    1069         WSGI handler. Sets up WSGI environment, calls the application, 
    1070         and sends the application's response. 
    1071         """ 
    1072         environ = request.environ 
    1073         environ.update(self.environ) 
    1074  
    1075         environ['wsgi.version'] = (1,0) 
    1076         environ['wsgi.input'] = request.input 
    1077         environ['wsgi.errors'] = sys.stderr 
    1078         environ['wsgi.multithread'] = self.multithreaded 
    1079         environ['wsgi.multiprocess'] = True 
    1080         environ['wsgi.run_once'] = False 
    1081  
    1082         if environ.get('HTTPS', 'off') in ('on', '1'): 
    1083             environ['wsgi.url_scheme'] = 'https' 
    1084         else: 
    1085             environ['wsgi.url_scheme'] = 'http' 
    1086  
    1087         headers_set = [] 
    1088         headers_sent = [] 
    1089         result = None 
    1090  
    1091         def write(data): 
    1092             assert type(data) is str, 'write() argument must be string' 
    1093             assert headers_set, 'write() before start_response()' 
    1094  
    1095             if not headers_sent: 
    1096                 status, responseHeaders = headers_sent[:] = headers_set 
    1097                 statusCode = int(status[:3]) 
    1098                 statusMsg = status[4:] 
    1099                 found = False 
    1100                 for header,value in responseHeaders: 
    1101                     if header.lower() == 'content-length': 
    1102                         found = True 
    1103                         break 
    1104                 if not found and result is not None: 
    1105                     try: 
    1106                         if len(result) == 1: 
    1107                             responseHeaders.append(('Content-Length', 
    1108                                                     str(len(data)))) 
    1109                     except: 
    1110                         pass 
    1111                 request.startResponse(statusCode, statusMsg, responseHeaders) 
    1112  
    1113             request.write(data) 
    1114  
    1115         def start_response(status, response_headers, exc_info=None): 
    1116             if exc_info: 
    1117                 try: 
    1118                     if headers_sent: 
    1119                         # Re-raise if too late 
    1120                         raise exc_info[0], exc_info[1], exc_info[2] 
    1121                 finally: 
    1122                     exc_info = None # avoid dangling circular ref 
    1123             else: 
    1124                 assert not headers_set, 'Headers already set!' 
    1125  
    1126             assert type(status) is str, 'Status must be a string' 
    1127             assert len(status) >= 4, 'Status must be at least 4 characters' 
    1128             assert int(status[:3]), 'Status must begin with 3-digit code' 
    1129             assert status[3] == ' ', 'Status must have a space after code' 
    1130             assert type(response_headers) is list, 'Headers must be a list' 
    1131             if __debug__: 
    1132                 for name,val in response_headers: 
    1133                     assert type(name) is str, 'Header names must be strings' 
    1134                     assert type(val) is str, 'Header values must be strings' 
    1135  
    1136             headers_set[:] = [status, response_headers] 
    1137             return write 
    1138  
    1139         if not self.multithreaded: 
    1140             self._appLock.acquire() 
    1141         try: 
    1142             result = self.application(environ, start_response) 
    1143             try: 
    1144                 for data in result: 
    1145                     if data: 
    1146                         write(data) 
    1147                 if not headers_sent: 
    1148                     write('') # in case body was empty 
    1149             finally: 
    1150                 if hasattr(result, 'close'): 
    1151                     result.close() 
    1152         finally: