tests/httpserverauth.py
author Matt Harbison <matt_harbison@yahoo.com>
Wed, 09 Dec 2020 12:57:40 -0500
changeset 46091 af3a6900f893
parent 43076 2372284d9457
child 48875 6000f5b25c9b
permissions -rw-r--r--
run-tests: fix `HGTESTEXTRAEXTENSIONS` with py3 Since `extensions` was a str and `section` bytes, it never populated anything. If it had, it would have put bytes into the environment dictionary that is all str. As everything starts and ends as str, remove the incomplete attempt at byteification. It doesn't appear that we had any test coverage of this bit of code, so also add a non-extension config to make sure it is filtered out properly. Differential Revision: https://phab.mercurial-scm.org/D9557

from __future__ import absolute_import

import base64
import hashlib

from mercurial.hgweb import common
from mercurial import node


def parse_keqv_list(req, l):
    """Parse list of key=value strings where keys are not duplicated."""
    parsed = {}
    for elt in l:
        k, v = elt.split(b'=', 1)
        if v[0:1] == b'"' and v[-1:] == b'"':
            v = v[1:-1]
        parsed[k] = v
    return parsed


class digestauthserver(object):
    def __init__(self):
        self._user_hashes = {}

    def gethashers(self):
        def _md5sum(x):
            m = hashlib.md5()
            m.update(x)
            return node.hex(m.digest())

        h = _md5sum

        kd = lambda s, d, h=h: h(b"%s:%s" % (s, d))
        return h, kd

    def adduser(self, user, password, realm):
        h, kd = self.gethashers()
        a1 = h(b'%s:%s:%s' % (user, realm, password))
        self._user_hashes[(user, realm)] = a1

    def makechallenge(self, realm):
        # We aren't testing the protocol here, just that the bytes make the
        # proper round trip.  So hardcoded seems fine.
        nonce = b'064af982c5b571cea6450d8eda91c20d'
        return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (
            realm,
            nonce,
        )

    def checkauth(self, req, header):
        log = req.rawenv[b'wsgi.errors']

        h, kd = self.gethashers()
        resp = parse_keqv_list(req, header.split(b', '))

        if resp.get(b'algorithm', b'MD5').upper() != b'MD5':
            log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm'))
            raise common.ErrorResponse(
                common.HTTP_FORBIDDEN, b"unknown algorithm"
            )
        user = resp[b'username']
        realm = resp[b'realm']
        nonce = resp[b'nonce']

        ha1 = self._user_hashes.get((user, realm))
        if not ha1:
            log.write(b'No hash found for user/realm "%s/%s"' % (user, realm))
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user")

        qop = resp.get(b'qop', b'auth')
        if qop != b'auth':
            log.write(b"Unsupported qop: %s" % qop)
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop")

        cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc')
        if not cnonce or not ncvalue:
            log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue))
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce")

        a2 = b'%s:%s' % (req.method, resp[b'uri'])
        noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2))

        respdig = kd(ha1, noncebit)
        if respdig != resp[b'response']:
            log.write(
                b'User/realm "%s/%s" gave %s, but expected %s'
                % (user, realm, resp[b'response'], respdig)
            )
            return False

        return True


digest = digestauthserver()


def perform_authentication(hgweb, req, op):
    auth = req.headers.get(b'Authorization')

    if req.headers.get(b'X-HgTest-AuthType') == b'Digest':
        if not auth:
            challenge = digest.makechallenge(b'mercurial')
            raise common.ErrorResponse(
                common.HTTP_UNAUTHORIZED,
                b'who',
                [(b'WWW-Authenticate', b'Digest %s' % challenge)],
            )

        if not digest.checkauth(req, auth[7:]):
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no')

        return

    if not auth:
        raise common.ErrorResponse(
            common.HTTP_UNAUTHORIZED,
            b'who',
            [(b'WWW-Authenticate', b'Basic Realm="mercurial"')],
        )

    if base64.b64decode(auth.split()[1]).split(b':', 1) != [b'user', b'pass']:
        raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no')


def extsetup(ui):
    common.permhooks.insert(0, perform_authentication)
    digest.adduser(b'user', b'pass', b'mercurial')