Mercurial > hg
changeset 41587:ccaa52865fac
tests: add code to handle HTTP digests on the server side
It's not hooked up yet. Mostly this was cargoculted and simplified from some
python.org code[1]. It's not trying to test the security as much as it is
trying to make sure that clients are sending out the right data when challenged.
(And they aren't on py3.)
[1] http://svn.python.org/projects/sandbox/trunk/digestauth/digestauth.py
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Tue, 05 Feb 2019 13:32:39 -0500 |
parents | 7855a949b7c2 |
children | 765a608c2108 |
files | tests/httpserverauth.py |
diffstat | 1 files changed, 81 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/tests/httpserverauth.py Tue Feb 05 13:30:48 2019 -0500 +++ b/tests/httpserverauth.py Tue Feb 05 13:32:39 2019 -0500 @@ -1,8 +1,89 @@ from __future__ import absolute_import import base64 +import hashlib from mercurial.hgweb import common +from mercurial import ( + node, +) + +def parse_keqv_list(req, l): + """Parse list of key=value strings where keys are not duplicated.""" + parsed = {} + for elt in l: + k, v = elt.split(b'=', 1) + if v[0:1] == b'"' and v[-1:] == b'"': + v = v[1:-1] + parsed[k] = v + return parsed + +class digestauthserver(object): + def __init__(self): + self._user_hashes = {} + + def gethashers(self): + def _md5sum(x): + m = hashlib.md5() + m.update(x) + return node.hex(m.digest()) + + h = _md5sum + + kd = lambda s, d, h=h: h(b"%s:%s" % (s, d)) + return h, kd + + def adduser(self, user, password, realm): + h, kd = self.gethashers() + a1 = h(b'%s:%s:%s' % (user, realm, password)) + self._user_hashes[(user, realm)] = a1 + + def makechallenge(self, realm): + # We aren't testing the protocol here, just that the bytes make the + # proper round trip. So hardcoded seems fine. + nonce = b'064af982c5b571cea6450d8eda91c20d' + return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (realm, + nonce) + + def checkauth(self, req, header): + log = req.rawenv[b'wsgi.errors'] + + h, kd = self.gethashers() + resp = parse_keqv_list(req, header.split(b', ')) + + if resp.get(b'algorithm', b'MD5').upper() != b'MD5': + log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm')) + raise common.ErrorResponse(common.HTTP_FORBIDDEN, + b"unknown algorithm") + user = resp[b'username'] + realm = resp[b'realm'] + nonce = resp[b'nonce'] + + ha1 = self._user_hashes.get((user, realm)) + if not ha1: + log.write(b'No hash found for user/realm "%s/%s"' % (user, realm)) + raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user") + + qop = resp.get(b'qop', b'auth') + if qop != b'auth': + log.write(b"Unsupported qop: %s" % qop) + raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop") + + cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc') + if not cnonce or not ncvalue: + log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue)) + raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce") + + a2 = b'%s:%s' % (req.method, resp[b'uri']) + noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2)) + + respdig = kd(ha1, noncebit) + if respdig != resp[b'response']: + log.write(b'User/realm "%s/%s" gave %s, but expected %s' + % (user, realm, resp[b'response'], respdig)) + return False + + return True def perform_authentication(hgweb, req, op): auth = req.headers.get(b'Authorization')