Mercurial > hg
view contrib/hgclient.py @ 43964:8f67735344ae
tests: convert the `root` arg of matchmod.match() to local path separators
This fixes tests that broke with 8b1a9ba375e5, complaining that "X not under
root /repo". The vast majority of real uses are to pass `repo.root`, which is
normalized by `wdirvfs.base` being set to the result of `os.path.realpath()`.
Failure to convert looks like this:
--- c:/Users/Matt/hg/tests/test-match.py.out
+++ c:/Users/Matt/hg/tests/test-match.py.err
@@ -0,0 +1,48 @@
+ERROR: testVisitchildrensetGlob (__main__.IncludeMatcherTests)
+
+Traceback (most recent call last):
+ File "c:\Users\Matt\hg\tests\test-match.py", line 180, in testVisitchildrensetGlob
+ m = matchmod.match(b'/repo', b'', include=[b'glob:dir/z*'])
+ File "c:\Users\Matt\hg\mercurial\match.py", line 271, in match
+ kindpats = normalize(include, b'glob', root, cwd, auditor, warn)
+ File "c:\Users\Matt\hg\mercurial\match.py", line 322, in _donormalize
+ pat = pathutil.canonpath(root, cwd, pat, auditor=auditor)
+ File "c:\Users\Matt\hg\mercurial\pathutil.py", line 251, in canonpath
+ _(b"%s not under root '%s'") % (myname, root), hint=hint
+Abort: dir/z* not under root '/repo'
+ERROR: testVisitdirGlob (__main__.IncludeMatcherTests)
Differential Revision: https://phab.mercurial-scm.org/D7724
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Thu, 26 Dec 2019 16:45:56 -0500 |
parents | 9f70512ae2cf |
children | 6000f5b25c9b |
line wrap: on
line source
# A minimal client for Mercurial's command server from __future__ import absolute_import, print_function import io import os import re import signal import socket import struct import subprocess import sys import time if sys.version_info[0] >= 3: stdout = sys.stdout.buffer stderr = sys.stderr.buffer stringio = io.BytesIO def bprint(*args): # remove b'' as well for ease of test migration pargs = [re.sub(br'''\bb(['"])''', br'\1', b'%s' % a) for a in args] stdout.write(b' '.join(pargs) + b'\n') else: import cStringIO stdout = sys.stdout stderr = sys.stderr stringio = cStringIO.StringIO bprint = print def connectpipe(path=None, extraargs=()): cmdline = [b'hg', b'serve', b'--cmdserver', b'pipe'] if path: cmdline += [b'-R', path] cmdline.extend(extraargs) def tonative(cmdline): if os.name != 'nt': return cmdline return [arg.decode("utf-8") for arg in cmdline] server = subprocess.Popen( tonative(cmdline), stdin=subprocess.PIPE, stdout=subprocess.PIPE ) return server class unixconnection(object): def __init__(self, sockpath): self.sock = sock = socket.socket(socket.AF_UNIX) sock.connect(sockpath) self.stdin = sock.makefile('wb') self.stdout = sock.makefile('rb') def wait(self): self.stdin.close() self.stdout.close() self.sock.close() class unixserver(object): def __init__(self, sockpath, logpath=None, repopath=None): self.sockpath = sockpath cmdline = [b'hg', b'serve', b'--cmdserver', b'unix', b'-a', sockpath] if repopath: cmdline += [b'-R', repopath] if logpath: stdout = open(logpath, 'a') stderr = subprocess.STDOUT else: stdout = stderr = None self.server = subprocess.Popen(cmdline, stdout=stdout, stderr=stderr) # wait for listen() while self.server.poll() is None: if os.path.exists(sockpath): break time.sleep(0.1) def connect(self): return unixconnection(self.sockpath) def shutdown(self): os.kill(self.server.pid, signal.SIGTERM) self.server.wait() def writeblock(server, data): server.stdin.write(struct.pack(b'>I', len(data))) server.stdin.write(data) server.stdin.flush() def readchannel(server): data = server.stdout.read(5) if not data: raise EOFError channel, length = struct.unpack('>cI', data) if channel in b'IL': return channel, length else: return channel, server.stdout.read(length) def sep(text): return text.replace(b'\\', b'/') def runcommand( server, args, output=stdout, error=stderr, input=None, outfilter=lambda x: x ): bprint(b'*** runcommand', b' '.join(args)) stdout.flush() server.stdin.write(b'runcommand\n') writeblock(server, b'\0'.join(args)) if not input: input = stringio() while True: ch, data = readchannel(server) if ch == b'o': output.write(outfilter(data)) output.flush() elif ch == b'e': error.write(data) error.flush() elif ch == b'I': writeblock(server, input.read(data)) elif ch == b'L': writeblock(server, input.readline(data)) elif ch == b'm': bprint(b"message: %r" % data) elif ch == b'r': (ret,) = struct.unpack('>i', data) if ret != 0: bprint(b' [%d]' % ret) return ret else: bprint(b"unexpected channel %c: %r" % (ch, data)) if ch.isupper(): return def check(func, connect=connectpipe): stdout.flush() server = connect() try: return func(server) finally: server.stdin.close() server.wait() def checkwith(connect=connectpipe, **kwargs): def wrap(func): return check(func, lambda: connect(**kwargs)) return wrap