tests/httpserverauth.py
author Manuel Jacob <me@manueljacob.de>
Mon, 01 Jun 2020 20:57:14 +0200
changeset 45005 1ca0047fd7e1
parent 43076 2372284d9457
child 48966 6000f5b25c9b
permissions -rw-r--r--
absorb: preserve changesets which were already empty Most commands in Mercurial (commit, rebase, absorb itself) don’t create empty changesets or drop them if they become empty. If there’s a changeset that’s empty, it must be a deliberate choice of the user. At least it shouldn’t be absorb’s responsibility to prune them. The fact that changesets that became empty during absorb are pruned, is unaffected by this. This case was found while writing patches which make it possible to configure absorb and rebase to not drop empty changesets. Even without having such config set, I think it’s valuable to preserve changesets which were already empty.

from __future__ import absolute_import

import base64
import hashlib

from mercurial.hgweb import common
from mercurial import node


def parse_keqv_list(req, l):
    """Parse list of key=value strings where keys are not duplicated."""
    parsed = {}
    for elt in l:
        k, v = elt.split(b'=', 1)
        if v[0:1] == b'"' and v[-1:] == b'"':
            v = v[1:-1]
        parsed[k] = v
    return parsed


class digestauthserver(object):
    def __init__(self):
        self._user_hashes = {}

    def gethashers(self):
        def _md5sum(x):
            m = hashlib.md5()
            m.update(x)
            return node.hex(m.digest())

        h = _md5sum

        kd = lambda s, d, h=h: h(b"%s:%s" % (s, d))
        return h, kd

    def adduser(self, user, password, realm):
        h, kd = self.gethashers()
        a1 = h(b'%s:%s:%s' % (user, realm, password))
        self._user_hashes[(user, realm)] = a1

    def makechallenge(self, realm):
        # We aren't testing the protocol here, just that the bytes make the
        # proper round trip.  So hardcoded seems fine.
        nonce = b'064af982c5b571cea6450d8eda91c20d'
        return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (
            realm,
            nonce,
        )

    def checkauth(self, req, header):
        log = req.rawenv[b'wsgi.errors']

        h, kd = self.gethashers()
        resp = parse_keqv_list(req, header.split(b', '))

        if resp.get(b'algorithm', b'MD5').upper() != b'MD5':
            log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm'))
            raise common.ErrorResponse(
                common.HTTP_FORBIDDEN, b"unknown algorithm"
            )
        user = resp[b'username']
        realm = resp[b'realm']
        nonce = resp[b'nonce']

        ha1 = self._user_hashes.get((user, realm))
        if not ha1:
            log.write(b'No hash found for user/realm "%s/%s"' % (user, realm))
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user")

        qop = resp.get(b'qop', b'auth')
        if qop != b'auth':
            log.write(b"Unsupported qop: %s" % qop)
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop")

        cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc')
        if not cnonce or not ncvalue:
            log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue))
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce")

        a2 = b'%s:%s' % (req.method, resp[b'uri'])
        noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2))

        respdig = kd(ha1, noncebit)
        if respdig != resp[b'response']:
            log.write(
                b'User/realm "%s/%s" gave %s, but expected %s'
                % (user, realm, resp[b'response'], respdig)
            )
            return False

        return True


digest = digestauthserver()


def perform_authentication(hgweb, req, op):
    auth = req.headers.get(b'Authorization')

    if req.headers.get(b'X-HgTest-AuthType') == b'Digest':
        if not auth:
            challenge = digest.makechallenge(b'mercurial')
            raise common.ErrorResponse(
                common.HTTP_UNAUTHORIZED,
                b'who',
                [(b'WWW-Authenticate', b'Digest %s' % challenge)],
            )

        if not digest.checkauth(req, auth[7:]):
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no')

        return

    if not auth:
        raise common.ErrorResponse(
            common.HTTP_UNAUTHORIZED,
            b'who',
            [(b'WWW-Authenticate', b'Basic Realm="mercurial"')],
        )

    if base64.b64decode(auth.split()[1]).split(b':', 1) != [b'user', b'pass']:
        raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no')


def extsetup(ui):
    common.permhooks.insert(0, perform_authentication)
    digest.adduser(b'user', b'pass', b'mercurial')