test-commandserver: split helper functions to new hgclient module
This prepares for porting test-commandserver.py to .t test.
Though command-server test needs many Python codes, .t test will be more
readable than .py test thanks to inlined output.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/contrib/hgclient.py Sun Sep 28 13:31:16 2014 +0900
@@ -0,0 +1,75 @@
+# A minimal client for Mercurial's command server
+
+import sys, struct, subprocess, cStringIO
+
+def connect(path=None):
+ cmdline = ['hg', 'serve', '--cmdserver', 'pipe']
+ if path:
+ cmdline += ['-R', path]
+
+ server = subprocess.Popen(cmdline, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE)
+
+ return server
+
+def writeblock(server, data):
+ server.stdin.write(struct.pack('>I', len(data)))
+ server.stdin.write(data)
+ server.stdin.flush()
+
+def readchannel(server):
+ data = server.stdout.read(5)
+ if not data:
+ raise EOFError
+ channel, length = struct.unpack('>cI', data)
+ if channel in 'IL':
+ return channel, length
+ else:
+ return channel, server.stdout.read(length)
+
+def sep(text):
+ return text.replace('\\', '/')
+
+def runcommand(server, args, output=sys.stdout, error=sys.stderr, input=None,
+ outfilter=lambda x: x):
+ print ' runcommand', ' '.join(args)
+ sys.stdout.flush()
+ server.stdin.write('runcommand\n')
+ writeblock(server, '\0'.join(args))
+
+ if not input:
+ input = cStringIO.StringIO()
+
+ while True:
+ ch, data = readchannel(server)
+ if ch == 'o':
+ output.write(outfilter(data))
+ output.flush()
+ elif ch == 'e':
+ error.write(data)
+ error.flush()
+ elif ch == 'I':
+ writeblock(server, input.read(data))
+ elif ch == 'L':
+ writeblock(server, input.readline(data))
+ elif ch == 'r':
+ ret, = struct.unpack('>i', data)
+ if ret != 0:
+ print ' [%d]' % ret
+ return ret
+ else:
+ print "unexpected channel %c: %r" % (ch, data)
+ if ch.isupper():
+ return
+
+def check(func, repopath=None):
+ print
+ print 'testing %s:' % func.__name__
+ print
+ sys.stdout.flush()
+ server = connect(repopath)
+ try:
+ return func(server)
+ finally:
+ server.stdin.close()
+ server.wait()
--- a/tests/test-commandserver.py Sun Sep 28 15:13:35 2014 +0900
+++ b/tests/test-commandserver.py Sun Sep 28 13:31:16 2014 +0900
@@ -1,76 +1,7 @@
-import sys, os, struct, subprocess, cStringIO, re, shutil
-
-def connect(path=None):
- cmdline = ['hg', 'serve', '--cmdserver', 'pipe']
- if path:
- cmdline += ['-R', path]
-
- server = subprocess.Popen(cmdline, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE)
-
- return server
-
-def writeblock(server, data):
- server.stdin.write(struct.pack('>I', len(data)))
- server.stdin.write(data)
- server.stdin.flush()
-
-def readchannel(server):
- data = server.stdout.read(5)
- if not data:
- raise EOFError
- channel, length = struct.unpack('>cI', data)
- if channel in 'IL':
- return channel, length
- else:
- return channel, server.stdout.read(length)
-
-def sep(text):
- return text.replace('\\', '/')
-
-def runcommand(server, args, output=sys.stdout, error=sys.stderr, input=None,
- outfilter=lambda x: x):
- print ' runcommand', ' '.join(args)
- sys.stdout.flush()
- server.stdin.write('runcommand\n')
- writeblock(server, '\0'.join(args))
+import sys, os, cStringIO, re, shutil
- if not input:
- input = cStringIO.StringIO()
-
- while True:
- ch, data = readchannel(server)
- if ch == 'o':
- output.write(outfilter(data))
- output.flush()
- elif ch == 'e':
- error.write(data)
- error.flush()
- elif ch == 'I':
- writeblock(server, input.read(data))
- elif ch == 'L':
- writeblock(server, input.readline(data))
- elif ch == 'r':
- ret, = struct.unpack('>i', data)
- if ret != 0:
- print ' [%d]' % ret
- return ret
- else:
- print "unexpected channel %c: %r" % (ch, data)
- if ch.isupper():
- return
-
-def check(func, repopath=None):
- print
- print 'testing %s:' % func.__name__
- print
- sys.stdout.flush()
- server = connect(repopath)
- try:
- return func(server)
- finally:
- server.stdin.close()
- server.wait()
+sys.path.insert(0, os.path.join(os.environ['TESTDIR'], '..', 'contrib'))
+from hgclient import readchannel, sep, runcommand, check
def unknowncommand(server):
server.stdin.write('unknowncommand\n')