Mercurial > hg
view tests/tinyproxy.py @ 13163:2fa2e6444645 stable
https: warn when server certificate isn't verified
Mercurial will verify HTTPS server certificates if web.cacerts is configured,
but it will by default silently not verify any certificates.
We now warn the user that when the certificate isn't verified she won't get the
security she might expect from https:
warning: localhost certificate not verified (check web.cacerts config setting)
Self-signed certificates can be accepted silently by configuring web.cacerts to
point to a suitable certificate file.
author | Mads Kiilerich <mads@kiilerich.com> |
---|---|
date | Sat, 18 Dec 2010 21:58:52 +0100 |
parents | 08a0f04b56bd |
children | ce99d887585f |
line wrap: on
line source
#!/usr/bin/env python __doc__ = """Tiny HTTP Proxy. This module implements GET, HEAD, POST, PUT and DELETE methods on BaseHTTPServer, and behaves as an HTTP proxy. The CONNECT method is also implemented experimentally, but has not been tested yet. Any help will be greatly appreciated. SUZUKI Hisao """ __version__ = "0.2.1" import BaseHTTPServer, select, socket, SocketServer, urlparse class ProxyHandler (BaseHTTPServer.BaseHTTPRequestHandler): __base = BaseHTTPServer.BaseHTTPRequestHandler __base_handle = __base.handle server_version = "TinyHTTPProxy/" + __version__ rbufsize = 0 # self.rfile Be unbuffered def handle(self): (ip, port) = self.client_address if hasattr(self, 'allowed_clients') and ip not in self.allowed_clients: self.raw_requestline = self.rfile.readline() if self.parse_request(): self.send_error(403) else: self.__base_handle() def _connect_to(self, netloc, soc): i = netloc.find(':') if i >= 0: host_port = netloc[:i], int(netloc[i + 1:]) else: host_port = netloc, 80 print "\t" "connect to %s:%d" % host_port try: soc.connect(host_port) except socket.error, arg: try: msg = arg[1] except: msg = arg self.send_error(404, msg) return 0 return 1 def do_CONNECT(self): soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: if self._connect_to(self.path, soc): self.log_request(200) self.wfile.write(self.protocol_version + " 200 Connection established\r\n") self.wfile.write("Proxy-agent: %s\r\n" % self.version_string()) self.wfile.write("\r\n") self._read_write(soc, 300) finally: print "\t" "bye" soc.close() self.connection.close() def do_GET(self): (scm, netloc, path, params, query, fragment) = urlparse.urlparse( self.path, 'http') if scm != 'http' or fragment or not netloc: self.send_error(400, "bad url %s" % self.path) return soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: if self._connect_to(netloc, soc): self.log_request() soc.send("%s %s %s\r\n" % ( self.command, urlparse.urlunparse(('', '', path, params, query, '')), self.request_version)) self.headers['Connection'] = 'close' del self.headers['Proxy-Connection'] for key_val in self.headers.items(): soc.send("%s: %s\r\n" % key_val) soc.send("\r\n") self._read_write(soc) finally: print "\t" "bye" soc.close() self.connection.close() def _read_write(self, soc, max_idling=20): iw = [self.connection, soc] ow = [] count = 0 while 1: count += 1 (ins, _, exs) = select.select(iw, ow, iw, 3) if exs: break if ins: for i in ins: if i is soc: out = self.connection else: out = soc data = i.recv(8192) if data: out.send(data) count = 0 else: print "\t" "idle", count if count == max_idling: break do_HEAD = do_GET do_POST = do_GET do_PUT = do_GET do_DELETE = do_GET class ThreadingHTTPServer (SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): pass if __name__ == '__main__': from sys import argv if argv[1:] and argv[1] in ('-h', '--help'): print argv[0], "[port [allowed_client_name ...]]" else: if argv[2:]: allowed = [] for name in argv[2:]: client = socket.gethostbyname(name) allowed.append(client) print "Accept: %s (%s)" % (client, name) ProxyHandler.allowed_clients = allowed del argv[2:] else: print "Any clients will be served..." BaseHTTPServer.test(ProxyHandler, ThreadingHTTPServer)