diff contrib/hgclient.py @ 22566:480b7fefbb08

test-commandserver: split helper functions to new hgclient module This prepares for porting test-commandserver.py to .t test. Though command-server test needs many Python codes, .t test will be more readable than .py test thanks to inlined output.
author Yuya Nishihara <yuya@tcha.org>
date Sun, 28 Sep 2014 13:31:16 +0900
parents
children db497a1ef1c1
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/contrib/hgclient.py	Sun Sep 28 13:31:16 2014 +0900
@@ -0,0 +1,75 @@
+# A minimal client for Mercurial's command server
+
+import sys, struct, subprocess, cStringIO
+
+def connect(path=None):
+    cmdline = ['hg', 'serve', '--cmdserver', 'pipe']
+    if path:
+        cmdline += ['-R', path]
+
+    server = subprocess.Popen(cmdline, stdin=subprocess.PIPE,
+                              stdout=subprocess.PIPE)
+
+    return server
+
+def writeblock(server, data):
+    server.stdin.write(struct.pack('>I', len(data)))
+    server.stdin.write(data)
+    server.stdin.flush()
+
+def readchannel(server):
+    data = server.stdout.read(5)
+    if not data:
+        raise EOFError
+    channel, length = struct.unpack('>cI', data)
+    if channel in 'IL':
+        return channel, length
+    else:
+        return channel, server.stdout.read(length)
+
+def sep(text):
+    return text.replace('\\', '/')
+
+def runcommand(server, args, output=sys.stdout, error=sys.stderr, input=None,
+               outfilter=lambda x: x):
+    print ' runcommand', ' '.join(args)
+    sys.stdout.flush()
+    server.stdin.write('runcommand\n')
+    writeblock(server, '\0'.join(args))
+
+    if not input:
+        input = cStringIO.StringIO()
+
+    while True:
+        ch, data = readchannel(server)
+        if ch == 'o':
+            output.write(outfilter(data))
+            output.flush()
+        elif ch == 'e':
+            error.write(data)
+            error.flush()
+        elif ch == 'I':
+            writeblock(server, input.read(data))
+        elif ch == 'L':
+            writeblock(server, input.readline(data))
+        elif ch == 'r':
+            ret, = struct.unpack('>i', data)
+            if ret != 0:
+                print ' [%d]' % ret
+            return ret
+        else:
+            print "unexpected channel %c: %r" % (ch, data)
+            if ch.isupper():
+                return
+
+def check(func, repopath=None):
+    print
+    print 'testing %s:' % func.__name__
+    print
+    sys.stdout.flush()
+    server = connect(repopath)
+    try:
+        return func(server)
+    finally:
+        server.stdin.close()
+        server.wait()