Mercurial > hg
view tests/tinyproxy.py @ 21977:4ca4e1572022
run-tests: '--time' option provide more details to Linux users
As our tests execute in child processes, this patch uses os.times()
module in replace of time.time() module to provide additional info like
user time and system time spent by child's processes along with real elapsed
time taken by a process.
There is one limitation of this patch. It can work only for Linux users and
not for Windows.
"os.times" module returns a 5-tuple of a floaing point numbers.
1) User time
2) System time
3) Child's user time
4) Child's system time
5) Ellapsed real time
On Windows, only the first two items are filled, the others are zero.
Therefore, below test cases does not break on Windows but instead gives the
zero value.
author | anuraggoel <anurag.dsps@gmail.com> |
---|---|
date | Thu, 26 Jun 2014 01:22:50 +0530 |
parents | ca430fb6a668 |
children | 328739ea70c3 |
line wrap: on
line source
#!/usr/bin/env python __doc__ = """Tiny HTTP Proxy. This module implements GET, HEAD, POST, PUT and DELETE methods on BaseHTTPServer, and behaves as an HTTP proxy. The CONNECT method is also implemented experimentally, but has not been tested yet. Any help will be greatly appreciated. SUZUKI Hisao """ __version__ = "0.2.1" import BaseHTTPServer, select, socket, SocketServer, urlparse, os class ProxyHandler (BaseHTTPServer.BaseHTTPRequestHandler): __base = BaseHTTPServer.BaseHTTPRequestHandler __base_handle = __base.handle server_version = "TinyHTTPProxy/" + __version__ rbufsize = 0 # self.rfile Be unbuffered def handle(self): (ip, port) = self.client_address allowed = getattr(self, 'allowed_clients', None) if allowed is not None and ip not in allowed: self.raw_requestline = self.rfile.readline() if self.parse_request(): self.send_error(403) else: self.__base_handle() def log_request(self, code='-', size='-'): xheaders = [h for h in self.headers.items() if h[0].startswith('x-')] self.log_message('"%s" %s %s%s', self.requestline, str(code), str(size), ''.join([' %s:%s' % h for h in sorted(xheaders)])) def _connect_to(self, netloc, soc): i = netloc.find(':') if i >= 0: host_port = netloc[:i], int(netloc[i + 1:]) else: host_port = netloc, 80 print "\t" "connect to %s:%d" % host_port try: soc.connect(host_port) except socket.error, arg: try: msg = arg[1] except (IndexError, TypeError): msg = arg self.send_error(404, msg) return 0 return 1 def do_CONNECT(self): soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: if self._connect_to(self.path, soc): self.log_request(200) self.wfile.write(self.protocol_version + " 200 Connection established\r\n") self.wfile.write("Proxy-agent: %s\r\n" % self.version_string()) self.wfile.write("\r\n") self._read_write(soc, 300) finally: print "\t" "bye" soc.close() self.connection.close() def do_GET(self): (scm, netloc, path, params, query, fragment) = urlparse.urlparse( self.path, 'http') if scm != 'http' or fragment or not netloc: self.send_error(400, "bad url %s" % self.path) return soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: if self._connect_to(netloc, soc): self.log_request() soc.send("%s %s %s\r\n" % ( self.command, urlparse.urlunparse(('', '', path, params, query, '')), self.request_version)) self.headers['Connection'] = 'close' del self.headers['Proxy-Connection'] for key_val in self.headers.items(): soc.send("%s: %s\r\n" % key_val) soc.send("\r\n") self._read_write(soc) finally: print "\t" "bye" soc.close() self.connection.close() def _read_write(self, soc, max_idling=20): iw = [self.connection, soc] ow = [] count = 0 while True: count += 1 (ins, _, exs) = select.select(iw, ow, iw, 3) if exs: break if ins: for i in ins: if i is soc: out = self.connection else: out = soc try: data = i.recv(8192) except socket.error: break if data: out.send(data) count = 0 else: print "\t" "idle", count if count == max_idling: break do_HEAD = do_GET do_POST = do_GET do_PUT = do_GET do_DELETE = do_GET class ThreadingHTTPServer (SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): def __init__(self, *args, **kwargs): BaseHTTPServer.HTTPServer.__init__(self, *args, **kwargs) a = open("proxy.pid", "w") a.write(str(os.getpid()) + "\n") a.close() if __name__ == '__main__': from sys import argv if argv[1:] and argv[1] in ('-h', '--help'): print argv[0], "[port [allowed_client_name ...]]" else: if argv[2:]: allowed = [] for name in argv[2:]: client = socket.gethostbyname(name) allowed.append(client) print "Accept: %s (%s)" % (client, name) ProxyHandler.allowed_clients = allowed del argv[2:] else: print "Any clients will be served..." BaseHTTPServer.test(ProxyHandler, ThreadingHTTPServer)