view tests/httpserverauth.py @ 48688:053a5bf508da

discovery: port _postprocessobsolete() changes from evolve, add tests Let's make this function obsolescence-aware as well. Now we check that revisions are not obsolete first, and only then check that they are going to be common after exchange. The tests make sure that rewriting changesets doesn't confuse head computation. They rely on experimental.single-head-per-branch feature to show that remote correctly sees the expected result (i.e. either aborts the push or allows it to succeed). They are ported from evolve as well. Differential Revision: https://phab.mercurial-scm.org/D12098
author Anton Shestakov <av6@dwimlabs.net>
date Tue, 04 Jan 2022 23:38:39 +0300
parents 2372284d9457
children 6000f5b25c9b
line wrap: on
line source

from __future__ import absolute_import

import base64
import hashlib

from mercurial.hgweb import common
from mercurial import node


def parse_keqv_list(req, l):
    """Parse list of key=value strings where keys are not duplicated."""
    parsed = {}
    for elt in l:
        k, v = elt.split(b'=', 1)
        if v[0:1] == b'"' and v[-1:] == b'"':
            v = v[1:-1]
        parsed[k] = v
    return parsed


class digestauthserver(object):
    def __init__(self):
        self._user_hashes = {}

    def gethashers(self):
        def _md5sum(x):
            m = hashlib.md5()
            m.update(x)
            return node.hex(m.digest())

        h = _md5sum

        kd = lambda s, d, h=h: h(b"%s:%s" % (s, d))
        return h, kd

    def adduser(self, user, password, realm):
        h, kd = self.gethashers()
        a1 = h(b'%s:%s:%s' % (user, realm, password))
        self._user_hashes[(user, realm)] = a1

    def makechallenge(self, realm):
        # We aren't testing the protocol here, just that the bytes make the
        # proper round trip.  So hardcoded seems fine.
        nonce = b'064af982c5b571cea6450d8eda91c20d'
        return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (
            realm,
            nonce,
        )

    def checkauth(self, req, header):
        log = req.rawenv[b'wsgi.errors']

        h, kd = self.gethashers()
        resp = parse_keqv_list(req, header.split(b', '))

        if resp.get(b'algorithm', b'MD5').upper() != b'MD5':
            log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm'))
            raise common.ErrorResponse(
                common.HTTP_FORBIDDEN, b"unknown algorithm"
            )
        user = resp[b'username']
        realm = resp[b'realm']
        nonce = resp[b'nonce']

        ha1 = self._user_hashes.get((user, realm))
        if not ha1:
            log.write(b'No hash found for user/realm "%s/%s"' % (user, realm))
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user")

        qop = resp.get(b'qop', b'auth')
        if qop != b'auth':
            log.write(b"Unsupported qop: %s" % qop)
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop")

        cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc')
        if not cnonce or not ncvalue:
            log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue))
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce")

        a2 = b'%s:%s' % (req.method, resp[b'uri'])
        noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2))

        respdig = kd(ha1, noncebit)
        if respdig != resp[b'response']:
            log.write(
                b'User/realm "%s/%s" gave %s, but expected %s'
                % (user, realm, resp[b'response'], respdig)
            )
            return False

        return True


digest = digestauthserver()


def perform_authentication(hgweb, req, op):
    auth = req.headers.get(b'Authorization')

    if req.headers.get(b'X-HgTest-AuthType') == b'Digest':
        if not auth:
            challenge = digest.makechallenge(b'mercurial')
            raise common.ErrorResponse(
                common.HTTP_UNAUTHORIZED,
                b'who',
                [(b'WWW-Authenticate', b'Digest %s' % challenge)],
            )

        if not digest.checkauth(req, auth[7:]):
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no')

        return

    if not auth:
        raise common.ErrorResponse(
            common.HTTP_UNAUTHORIZED,
            b'who',
            [(b'WWW-Authenticate', b'Basic Realm="mercurial"')],
        )

    if base64.b64decode(auth.split()[1]).split(b':', 1) != [b'user', b'pass']:
        raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no')


def extsetup(ui):
    common.permhooks.insert(0, perform_authentication)
    digest.adduser(b'user', b'pass', b'mercurial')