# HG changeset patch # User Yuya Nishihara # Date 1411878676 -32400 # Node ID 480b7fefbb089ff07e2220c119f235a6786faf19 # Parent 8d45a42b0c0f53f7c7e9151f828ecdf39fd02b74 test-commandserver: split helper functions to new hgclient module This prepares for porting test-commandserver.py to .t test. Though command-server test needs many Python codes, .t test will be more readable than .py test thanks to inlined output. diff -r 8d45a42b0c0f -r 480b7fefbb08 contrib/hgclient.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/contrib/hgclient.py Sun Sep 28 13:31:16 2014 +0900 @@ -0,0 +1,75 @@ +# A minimal client for Mercurial's command server + +import sys, struct, subprocess, cStringIO + +def connect(path=None): + cmdline = ['hg', 'serve', '--cmdserver', 'pipe'] + if path: + cmdline += ['-R', path] + + server = subprocess.Popen(cmdline, stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + + return server + +def writeblock(server, data): + server.stdin.write(struct.pack('>I', len(data))) + server.stdin.write(data) + server.stdin.flush() + +def readchannel(server): + data = server.stdout.read(5) + if not data: + raise EOFError + channel, length = struct.unpack('>cI', data) + if channel in 'IL': + return channel, length + else: + return channel, server.stdout.read(length) + +def sep(text): + return text.replace('\\', '/') + +def runcommand(server, args, output=sys.stdout, error=sys.stderr, input=None, + outfilter=lambda x: x): + print ' runcommand', ' '.join(args) + sys.stdout.flush() + server.stdin.write('runcommand\n') + writeblock(server, '\0'.join(args)) + + if not input: + input = cStringIO.StringIO() + + while True: + ch, data = readchannel(server) + if ch == 'o': + output.write(outfilter(data)) + output.flush() + elif ch == 'e': + error.write(data) + error.flush() + elif ch == 'I': + writeblock(server, input.read(data)) + elif ch == 'L': + writeblock(server, input.readline(data)) + elif ch == 'r': + ret, = struct.unpack('>i', data) + if ret != 0: + print ' [%d]' % ret + return ret + else: + print "unexpected channel %c: %r" % (ch, data) + if ch.isupper(): + return + +def check(func, repopath=None): + print + print 'testing %s:' % func.__name__ + print + sys.stdout.flush() + server = connect(repopath) + try: + return func(server) + finally: + server.stdin.close() + server.wait() diff -r 8d45a42b0c0f -r 480b7fefbb08 tests/test-commandserver.py --- a/tests/test-commandserver.py Sun Sep 28 15:13:35 2014 +0900 +++ b/tests/test-commandserver.py Sun Sep 28 13:31:16 2014 +0900 @@ -1,76 +1,7 @@ -import sys, os, struct, subprocess, cStringIO, re, shutil - -def connect(path=None): - cmdline = ['hg', 'serve', '--cmdserver', 'pipe'] - if path: - cmdline += ['-R', path] - - server = subprocess.Popen(cmdline, stdin=subprocess.PIPE, - stdout=subprocess.PIPE) - - return server - -def writeblock(server, data): - server.stdin.write(struct.pack('>I', len(data))) - server.stdin.write(data) - server.stdin.flush() - -def readchannel(server): - data = server.stdout.read(5) - if not data: - raise EOFError - channel, length = struct.unpack('>cI', data) - if channel in 'IL': - return channel, length - else: - return channel, server.stdout.read(length) - -def sep(text): - return text.replace('\\', '/') - -def runcommand(server, args, output=sys.stdout, error=sys.stderr, input=None, - outfilter=lambda x: x): - print ' runcommand', ' '.join(args) - sys.stdout.flush() - server.stdin.write('runcommand\n') - writeblock(server, '\0'.join(args)) +import sys, os, cStringIO, re, shutil - if not input: - input = cStringIO.StringIO() - - while True: - ch, data = readchannel(server) - if ch == 'o': - output.write(outfilter(data)) - output.flush() - elif ch == 'e': - error.write(data) - error.flush() - elif ch == 'I': - writeblock(server, input.read(data)) - elif ch == 'L': - writeblock(server, input.readline(data)) - elif ch == 'r': - ret, = struct.unpack('>i', data) - if ret != 0: - print ' [%d]' % ret - return ret - else: - print "unexpected channel %c: %r" % (ch, data) - if ch.isupper(): - return - -def check(func, repopath=None): - print - print 'testing %s:' % func.__name__ - print - sys.stdout.flush() - server = connect(repopath) - try: - return func(server) - finally: - server.stdin.close() - server.wait() +sys.path.insert(0, os.path.join(os.environ['TESTDIR'], '..', 'contrib')) +from hgclient import readchannel, sep, runcommand, check def unknowncommand(server): server.stdin.write('unknowncommand\n')