tests: add a module that can perform the equivalent of `SIGKILL` on any OS
I started with this being Windows specific, but let's push all of the decision
making into this function so that it can just be called by the tests. The
tradeoff is that this is very specific to sending `SIGKILL`- since
`signal.SIGKILL` doesn't exist on Windows, the desired signal can't be passed
from the caller. Maybe there's a way, but let's wait until there's a need.
We don't use `killdaemons.py` unconditionally because it starts with a more
graceful `SIGTERM` on posix.
import base64
import hashlib
from mercurial.hgweb import common
from mercurial import node
def parse_keqv_list(req, l):
"""Parse list of key=value strings where keys are not duplicated."""
parsed = {}
for elt in l:
k, v = elt.split(b'=', 1)
if v[0:1] == b'"' and v[-1:] == b'"':
v = v[1:-1]
parsed[k] = v
return parsed
class digestauthserver:
def __init__(self):
self._user_hashes = {}
def gethashers(self):
def _md5sum(x):
m = hashlib.md5()
m.update(x)
return node.hex(m.digest())
h = _md5sum
kd = lambda s, d, h=h: h(b"%s:%s" % (s, d))
return h, kd
def adduser(self, user, password, realm):
h, kd = self.gethashers()
a1 = h(b'%s:%s:%s' % (user, realm, password))
self._user_hashes[(user, realm)] = a1
def makechallenge(self, realm):
# We aren't testing the protocol here, just that the bytes make the
# proper round trip. So hardcoded seems fine.
nonce = b'064af982c5b571cea6450d8eda91c20d'
return b'realm="%s", nonce="%s", algorithm=MD5, qop="auth"' % (
realm,
nonce,
)
def checkauth(self, req, header):
log = req.rawenv[b'wsgi.errors']
h, kd = self.gethashers()
resp = parse_keqv_list(req, header.split(b', '))
if resp.get(b'algorithm', b'MD5').upper() != b'MD5':
log.write(b'Unsupported algorithm: %s' % resp.get(b'algorithm'))
raise common.ErrorResponse(
common.HTTP_FORBIDDEN, b"unknown algorithm"
)
user = resp[b'username']
realm = resp[b'realm']
nonce = resp[b'nonce']
ha1 = self._user_hashes.get((user, realm))
if not ha1:
log.write(b'No hash found for user/realm "%s/%s"' % (user, realm))
raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad user")
qop = resp.get(b'qop', b'auth')
if qop != b'auth':
log.write(b"Unsupported qop: %s" % qop)
raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"bad qop")
cnonce, ncvalue = resp.get(b'cnonce'), resp.get(b'nc')
if not cnonce or not ncvalue:
log.write(b'No cnonce (%s) or ncvalue (%s)' % (cnonce, ncvalue))
raise common.ErrorResponse(common.HTTP_FORBIDDEN, b"no cnonce")
a2 = b'%s:%s' % (req.method, resp[b'uri'])
noncebit = b"%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, h(a2))
respdig = kd(ha1, noncebit)
if respdig != resp[b'response']:
log.write(
b'User/realm "%s/%s" gave %s, but expected %s'
% (user, realm, resp[b'response'], respdig)
)
return False
return True
digest = digestauthserver()
def perform_authentication(hgweb, req, op):
auth = req.headers.get(b'Authorization')
if req.headers.get(b'X-HgTest-AuthType') == b'Digest':
if not auth:
challenge = digest.makechallenge(b'mercurial')
raise common.ErrorResponse(
common.HTTP_UNAUTHORIZED,
b'who',
[(b'WWW-Authenticate', b'Digest %s' % challenge)],
)
if not digest.checkauth(req, auth[7:]):
raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no')
return
if not auth:
raise common.ErrorResponse(
common.HTTP_UNAUTHORIZED,
b'who',
[(b'WWW-Authenticate', b'Basic Realm="mercurial"')],
)
if base64.b64decode(auth.split()[1]).split(b':', 1) != [b'user', b'pass']:
raise common.ErrorResponse(common.HTTP_FORBIDDEN, b'no')
def extsetup(ui):
common.permhooks.insert(0, perform_authentication)
digest.adduser(b'user', b'pass', b'mercurial')