author Matt Harbison <matt_harbison@yahoo.com>
Thu, 26 Sep 2024 18:15:36 -0400
changeset 51923 b455dfddfed0
parent 48946 642e31cb55f0
permissions -rw-r--r--
interfaces: convert the zope `Attribute` attrs to regular fields At this point, we should have a useful protocol class. The file syntax requires the type to be supplied for any fields that are declared, but we'll leave the complex ones partially unspecified for now, for simplicity. (Also, the things documented as `Callable` are really as future type annotating worked showed- roll with it for now, but they're marked as TODO for fixing later.) All of the fields and all of the attrs will need type annotations, or the type rules say they are considered to be `Any`. That can be done in a separate pass, possibly applying the `dirstate.pyi` file generated from the concrete class. The first cut of this turned the `interfaceutil.Attribute` fields into plain fields, and thus the types on them. PyCharm flagged a few things as having incompatible signatures when the concrete dirstate class subclassed this, when the concrete class has them declared as `@property`. So they've been changed to `@property` here in those cases. The remaining fields that are decorated in the concrete class have comments noting the differences. We'll see if they need to be changed going forward, but leave them for now. We'll be in trouble if the `@util.propertycache` is needed, because we can't import that module here at runtime, due to circular imports.

import base64
import hashlib

from mercurial.hgweb import common
from mercurial import node

def parse_keqv_list(req, l):
    """Parse list of key=value strings where keys are not duplicated."""
    parsed = {}
    for elt in l:
        k, v = elt.split(b'=', 1)
        if v[0:1] == b'"' and v[-1:] == b'"':
            v = v[1:-1]
        parsed[k] = v
    return parsed

class digestauthserver:
    def __init__(self):
        self._user_hashes = {}

    def gethashers(self):
        def _md5sum(x):
            m = hashlib.md5()
            return node.hex(m.digest())

        h = _md5sum

        kd = lambda s, d, h=h: h(b"%s:%s" % (s, d))
        return h, kd

    def adduser(self, user, password, realm):
        h, kd = self.gethashers()
        a1 = h(b'%s:%s:%s' % (user, realm, password))
        self._user_hashes[(user, realm)] = a1

    def makechallenge(self, realm):
        # We aren't testing the protocol here, just that the bytes make the
        # proper round trip.  So hardcoded seems fine.
        nonce = b'064af982c5b571cea6450d8eda91c20d'
        return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (

    def checkauth(self, req, header):
        log = req.rawenv[b'wsgi.errors']

        h, kd = self.gethashers()
        resp = parse_keqv_list(req, header.split(b', '))

        if resp.get(b'algorithm', b'MD5').upper() != b'MD5':
            log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm'))
            raise common.ErrorResponse(
                common.HTTP_FORBIDDEN, b"unknown algorithm"
        user = resp[b'username']
        realm = resp[b'realm']
        nonce = resp[b'nonce']

        ha1 = self._user_hashes.get((user, realm))
        if not ha1:
            log.write(b'No hash found for user/realm "%s/%s"' % (user, realm))
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user")

        qop = resp.get(b'qop', b'auth')
        if qop != b'auth':
            log.write(b"Unsupported qop: %s" % qop)
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop")

        cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc')
        if not cnonce or not ncvalue:
            log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue))
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce")

        a2 = b'%s:%s' % (req.method, resp[b'uri'])
        noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2))

        respdig = kd(ha1, noncebit)
        if respdig != resp[b'response']:
                b'User/realm "%s/%s" gave %s, but expected %s'
                % (user, realm, resp[b'response'], respdig)
            return False

        return True

digest = digestauthserver()

def perform_authentication(hgweb, req, op):
    auth = req.headers.get(b'Authorization')

    if req.headers.get(b'X-HgTest-AuthType') == b'Digest':
        if not auth:
            challenge = digest.makechallenge(b'mercurial')
            raise common.ErrorResponse(
                [(b'WWW-Authenticate', b'Digest %s' % challenge)],

        if not digest.checkauth(req, auth[7:]):
            raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no')


    if not auth:
        raise common.ErrorResponse(
            [(b'WWW-Authenticate', b'Basic Realm="mercurial"')],

    if base64.b64decode(auth.split()[1]).split(b':', 1) != [b'user', b'pass']:
        raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no')

def extsetup(ui):
    common.permhooks.insert(0, perform_authentication)
    digest.adduser(b'user', b'pass', b'mercurial')