--- a/tests/httpserverauth.py Tue Feb 05 13:30:48 2019 -0500
+++ b/tests/httpserverauth.py Tue Feb 05 13:32:39 2019 -0500
@@ -1,8 +1,89 @@
from __future__ import absolute_import
import base64
+import hashlib
from mercurial.hgweb import common
+from mercurial import (
+ node,
+)
+
+def parse_keqv_list(req, l):
+ """Parse list of key=value strings where keys are not duplicated."""
+ parsed = {}
+ for elt in l:
+ k, v = elt.split(b'=', 1)
+ if v[0:1] == b'"' and v[-1:] == b'"':
+ v = v[1:-1]
+ parsed[k] = v
+ return parsed
+
+class digestauthserver(object):
+ def __init__(self):
+ self._user_hashes = {}
+
+ def gethashers(self):
+ def _md5sum(x):
+ m = hashlib.md5()
+ m.update(x)
+ return node.hex(m.digest())
+
+ h = _md5sum
+
+ kd = lambda s, d, h=h: h(b"%s:%s" % (s, d))
+ return h, kd
+
+ def adduser(self, user, password, realm):
+ h, kd = self.gethashers()
+ a1 = h(b'%s:%s:%s' % (user, realm, password))
+ self._user_hashes[(user, realm)] = a1
+
+ def makechallenge(self, realm):
+ # We aren't testing the protocol here, just that the bytes make the
+ # proper round trip. So hardcoded seems fine.
+ nonce = b'064af982c5b571cea6450d8eda91c20d'
+ return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (realm,
+ nonce)
+
+ def checkauth(self, req, header):
+ log = req.rawenv[b'wsgi.errors']
+
+ h, kd = self.gethashers()
+ resp = parse_keqv_list(req, header.split(b', '))
+
+ if resp.get(b'algorithm', b'MD5').upper() != b'MD5':
+ log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm'))
+ raise common.ErrorResponse(common.HTTP_FORBIDDEN,
+ b"unknown algorithm")
+ user = resp[b'username']
+ realm = resp[b'realm']
+ nonce = resp[b'nonce']
+
+ ha1 = self._user_hashes.get((user, realm))
+ if not ha1:
+ log.write(b'No hash found for user/realm "%s/%s"' % (user, realm))
+ raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user")
+
+ qop = resp.get(b'qop', b'auth')
+ if qop != b'auth':
+ log.write(b"Unsupported qop: %s" % qop)
+ raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop")
+
+ cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc')
+ if not cnonce or not ncvalue:
+ log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue))
+ raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce")
+
+ a2 = b'%s:%s' % (req.method, resp[b'uri'])
+ noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2))
+
+ respdig = kd(ha1, noncebit)
+ if respdig != resp[b'response']:
+ log.write(b'User/realm "%s/%s" gave %s, but expected %s'
+ % (user, realm, resp[b'response'], respdig))
+ return False
+
+ return True
def perform_authentication(hgweb, req, op):
auth = req.headers.get(b'Authorization')