Mercurial > hg
diff mercurial/url.py @ 14204:5fa21960b2f4
sslutil: extracted ssl methods from httpsconnection in url.py
This makes it easier to share ssl cert validation with other http
implementations.
author | Augie Fackler <durin42@gmail.com> |
---|---|
date | Wed, 04 May 2011 22:08:55 -0500 |
parents | 924c82157d46 |
children | e7525a555a64 |
line wrap: on
line diff
--- a/mercurial/url.py Fri May 06 00:34:10 2011 +0200 +++ b/mercurial/url.py Wed May 04 22:08:55 2011 -0500 @@ -10,7 +10,7 @@ import urllib, urllib2, httplib, os, socket, cStringIO import __builtin__ from i18n import _ -import keepalive, util +import keepalive, util, sslutil def readauthforuri(ui, uri): # Read configuration @@ -202,23 +202,6 @@ has_https = hasattr(urllib2, 'HTTPSHandler') if has_https: try: - # avoid using deprecated/broken FakeSocket in python 2.6 - import ssl - _ssl_wrap_socket = ssl.wrap_socket - CERT_REQUIRED = ssl.CERT_REQUIRED - except ImportError: - CERT_REQUIRED = 2 - - def _ssl_wrap_socket(sock, key_file, cert_file, - cert_reqs=CERT_REQUIRED, ca_certs=None): - if ca_certs: - raise util.Abort(_( - 'certificate checking requires Python 2.6')) - - ssl = socket.ssl(sock, key_file, cert_file) - return httplib.FakeSocket(sock, ssl) - - try: _create_connection = socket.create_connection except AttributeError: _GLOBAL_DEFAULT_TIMEOUT = object() @@ -257,7 +240,7 @@ self.sock.connect((self.host, self.port)) if _generic_proxytunnel(self): # we do not support client x509 certificates - self.sock = _ssl_wrap_socket(self.sock, None, None) + self.sock = sslutil.ssl_wrap_socket(self.sock, None, None) else: keepalive.HTTPConnection.connect(self) @@ -398,41 +381,6 @@ _generic_start_transaction(self, h, req) return keepalive.HTTPHandler._start_transaction(self, h, req) -def _verifycert(cert, hostname): - '''Verify that cert (in socket.getpeercert() format) matches hostname. - CRLs is not handled. - - Returns error message if any problems are found and None on success. - ''' - if not cert: - return _('no certificate received') - dnsname = hostname.lower() - def matchdnsname(certname): - return (certname == dnsname or - '.' in dnsname and certname == '*.' + dnsname.split('.', 1)[1]) - - san = cert.get('subjectAltName', []) - if san: - certnames = [value.lower() for key, value in san if key == 'DNS'] - for name in certnames: - if matchdnsname(name): - return None - return _('certificate is for %s') % ', '.join(certnames) - - # subject is only checked when subjectAltName is empty - for s in cert.get('subject', []): - key, value = s[0] - if key == 'commonName': - try: - # 'subject' entries are unicode - certname = value.lower().encode('ascii') - except UnicodeEncodeError: - return _('IDN in certificate not supported') - if matchdnsname(certname): - return None - return _('certificate is for %s') % certname - return _('no commonName or subjectAltName found in certificate') - if has_https: class httpsconnection(httplib.HTTPSConnection): response_class = keepalive.HTTPResponse @@ -447,53 +395,10 @@ if self.realhostport: # use CONNECT proxy _generic_proxytunnel(self) host = self.realhostport.rsplit(':', 1)[0] - - cacerts = self.ui.config('web', 'cacerts') - hostfingerprint = self.ui.config('hostfingerprints', host) - - if cacerts and not hostfingerprint: - cacerts = util.expandpath(cacerts) - if not os.path.exists(cacerts): - raise util.Abort(_('could not find ' - 'web.cacerts: %s') % cacerts) - self.sock = _ssl_wrap_socket(self.sock, self.key_file, - self.cert_file, cert_reqs=CERT_REQUIRED, - ca_certs=cacerts) - msg = _verifycert(self.sock.getpeercert(), host) - if msg: - raise util.Abort(_('%s certificate error: %s ' - '(use --insecure to connect ' - 'insecurely)') % (host, msg)) - self.ui.debug('%s certificate successfully verified\n' % host) - else: - self.sock = _ssl_wrap_socket(self.sock, self.key_file, - self.cert_file) - if hasattr(self.sock, 'getpeercert'): - peercert = self.sock.getpeercert(True) - peerfingerprint = util.sha1(peercert).hexdigest() - nicefingerprint = ":".join([peerfingerprint[x:x + 2] - for x in xrange(0, len(peerfingerprint), 2)]) - if hostfingerprint: - if peerfingerprint.lower() != \ - hostfingerprint.replace(':', '').lower(): - raise util.Abort(_('invalid certificate for %s ' - 'with fingerprint %s') % - (host, nicefingerprint)) - self.ui.debug('%s certificate matched fingerprint %s\n' % - (host, nicefingerprint)) - else: - self.ui.warn(_('warning: %s certificate ' - 'with fingerprint %s not verified ' - '(check hostfingerprints or web.cacerts ' - 'config setting)\n') % - (host, nicefingerprint)) - else: # python 2.5 ? - if hostfingerprint: - raise util.Abort(_('no certificate for %s with ' - 'configured hostfingerprint') % host) - self.ui.warn(_('warning: %s certificate not verified ' - '(check web.cacerts config setting)\n') % - host) + self.sock = sslutil.ssl_wrap_socket( + self.sock, self.key_file, self.cert_file, + **sslutil.sslkwargs(self.ui, host)) + sslutil.validator(self.ui, host)(self.sock) class httpshandler(keepalive.KeepAliveHandler, urllib2.HTTPSHandler): def __init__(self, ui):