view mercurial/sslutil.py @ 14485:610873cf064a

Make pull -u behave like pull && update Previously, pull would not update if new branch heads were received, whereas pull && update would move to the tipmost branch head. Also change the "crosses branches" abort in merge.update from "crosses branches (merge branches or use --check to force update)" to "crosses branches (merge branches or update --check to force update)" since it can no longer assume the user is running hg update.
author Brendan Cully <brendan@kublai.com>
date Tue, 31 May 2011 11:52:22 -0700
parents 5fa21960b2f4
children 64dfbe576455
line wrap: on
line source

# sslutil.py - SSL handling for mercurial
#
# Copyright 2005, 2006, 2007, 2008 Matt Mackall <mpm@selenic.com>
# Copyright 2006, 2007 Alexis S. L. Carvalho <alexis@cecm.usp.br>
# Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
import os

from mercurial import util
from mercurial.i18n import _
try:
    # avoid using deprecated/broken FakeSocket in python 2.6
    import ssl
    ssl_wrap_socket = ssl.wrap_socket
    CERT_REQUIRED = ssl.CERT_REQUIRED
except ImportError:
    CERT_REQUIRED = 2

    def ssl_wrap_socket(sock, key_file, cert_file,
                        cert_reqs=CERT_REQUIRED, ca_certs=None):
        if ca_certs:
            raise util.Abort(_(
                'certificate checking requires Python 2.6'))

        ssl = socket.ssl(sock, key_file, cert_file)
        return httplib.FakeSocket(sock, ssl)

def _verifycert(cert, hostname):
    '''Verify that cert (in socket.getpeercert() format) matches hostname.
    CRLs is not handled.

    Returns error message if any problems are found and None on success.
    '''
    if not cert:
        return _('no certificate received')
    dnsname = hostname.lower()
    def matchdnsname(certname):
        return (certname == dnsname or
                '.' in dnsname and certname == '*.' + dnsname.split('.', 1)[1])

    san = cert.get('subjectAltName', [])
    if san:
        certnames = [value.lower() for key, value in san if key == 'DNS']
        for name in certnames:
            if matchdnsname(name):
                return None
        return _('certificate is for %s') % ', '.join(certnames)

    # subject is only checked when subjectAltName is empty
    for s in cert.get('subject', []):
        key, value = s[0]
        if key == 'commonName':
            try:
                # 'subject' entries are unicode
                certname = value.lower().encode('ascii')
            except UnicodeEncodeError:
                return _('IDN in certificate not supported')
            if matchdnsname(certname):
                return None
            return _('certificate is for %s') % certname
    return _('no commonName or subjectAltName found in certificate')


# CERT_REQUIRED means fetch the cert from the server all the time AND
# validate it against the CA store provided in web.cacerts.
#
# We COMPLETELY ignore CERT_REQUIRED on Python <= 2.5, as it's totally
# busted on those versions.

def sslkwargs(ui, host):
    cacerts = ui.config('web', 'cacerts')
    hostfingerprint = ui.config('hostfingerprints', host)
    if cacerts and not hostfingerprint:
        cacerts = util.expandpath(cacerts)
        if not os.path.exists(cacerts):
            raise util.Abort(_('could not find web.cacerts: %s') % cacerts)
        return {'ca_certs': cacerts,
                'cert_reqs': CERT_REQUIRED,
                }
    return {}

class validator(object):
    def __init__(self, ui, host):
        self.ui = ui
        self.host = host

    def __call__(self, sock):
        host = self.host
        cacerts = self.ui.config('web', 'cacerts')
        hostfingerprint = self.ui.config('hostfingerprints', host)
        if cacerts and not hostfingerprint:
            msg = _verifycert(sock.getpeercert(), host)
            if msg:
                raise util.Abort(_('%s certificate error: %s '
                                   '(use --insecure to connect '
                                   'insecurely)') % (host, msg))
            self.ui.debug('%s certificate successfully verified\n' % host)
        else:
            if getattr(sock, 'getpeercert', False):
                peercert = sock.getpeercert(True)
                peerfingerprint = util.sha1(peercert).hexdigest()
                nicefingerprint = ":".join([peerfingerprint[x:x + 2]
                    for x in xrange(0, len(peerfingerprint), 2)])
                if hostfingerprint:
                    if peerfingerprint.lower() != \
                            hostfingerprint.replace(':', '').lower():
                        raise util.Abort(_('invalid certificate for %s '
                                           'with fingerprint %s') %
                                         (host, nicefingerprint))
                    self.ui.debug('%s certificate matched fingerprint %s\n' %
                                  (host, nicefingerprint))
                else:
                    self.ui.warn(_('warning: %s certificate '
                                   'with fingerprint %s not verified '
                                   '(check hostfingerprints or web.cacerts '
                                   'config setting)\n') %
                                 (host, nicefingerprint))
            else: # python 2.5 ?
                if hostfingerprint:
                    raise util.Abort(_('no certificate for %s with '
                                       'configured hostfingerprint') % host)
                self.ui.warn(_('warning: %s certificate not verified '
                               '(check web.cacerts config setting)\n') %
                             host)