Mercurial > python-hglib
changeset 0:79f88b4db15f
Initial commit
author | Idan Kamara <idankk86@gmail.com> |
---|---|
date | Wed, 20 Jul 2011 16:09:34 -0500 |
parents | |
children | bbd294291dd8 |
files | .hgignore LICENSE Makefile hglib/error.py hglib/hglib.py hglib/styles/rev.style hglib/util.py tests/test-hglib.py |
diffstat | 8 files changed, 484 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.hgignore Wed Jul 20 16:09:34 2011 -0500 @@ -0,0 +1,6 @@ +syntax: glob + +*.pyc +*.orig +*.rej +*~
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/LICENSE Wed Jul 20 16:09:34 2011 -0500 @@ -0,0 +1,20 @@ +Copyright (c) 2011 Matt Mackall and other contributors + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Makefile Wed Jul 20 16:09:34 2011 -0500 @@ -0,0 +1,11 @@ +PYTHON=python +help: + @echo 'Commonly used make targets:' + @echo ' tests - run all tests in the automatic test suite' + +all: help + +.PHONY: tests + +tests: + cd tests && $(PYTHON) $(HGREPO)/tests/run-tests.py -l $(TESTFLAGS)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hglib/error.py Wed Jul 20 16:09:34 2011 -0500 @@ -0,0 +1,15 @@ +class CommandError(Exception): + def __init__(self, args, ret, out, err): + self.args = args + self.ret = ret + self.out = out + self.err = err + +class ServerError(Exception): + pass + +class ResponseError(ServerError, ValueError): + pass + +class CapabilityError(ServerError): + pass
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hglib/hglib.py Wed Jul 20 16:09:34 2011 -0500 @@ -0,0 +1,311 @@ +import subprocess, os, struct, cStringIO, collections +import error, util + +HGPATH = 'hg' + +def connect(path=None, encoding=None, configs=None): + ''' starts a cmdserver for the given path (or for a repository found in the + cwd). HGENCODING is set to the given encoding. configs is a list of key, value, + similar to those passed to hg --config. ''' + return hgclient(path, encoding, configs) + +class hgclient(object): + inputfmt = '>I' + outputfmt = '>cI' + outputfmtsize = struct.calcsize(outputfmt) + retfmt = '>i' + + # XXX fix this hack + _stylesdir = os.path.join(os.path.dirname(__file__), 'styles') + revstyle = ['--style', os.path.join(_stylesdir, 'rev.style')] + + revision = collections.namedtuple('revision', 'rev, node, tags, ' + 'branch, author, desc') + + def __init__(self, path, encoding, configs): + args = [HGPATH, 'serve', '--cmdserver', 'pipe'] + if path: + args += ['-R', path] + if configs: + args += ['--config'] + configs + env = dict(os.environ) + if encoding: + env['HGENCODING'] = encoding + + self.server = subprocess.Popen(args, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, env=env) + + self._readhello() + self._config = {} + + def _readhello(self): + """ read the hello message the server sends when started """ + ch, msg = self._readchannel() + assert ch == 'o' + + msg = msg.split('\n') + + self.capabilities = msg[0][len('capabilities: '):] + if not self.capabilities: + raise error.ResponseError("bad hello message: expected 'capabilities: '" + ", got %r" % msg[0]) + + self.capabilities = set(self.capabilities.split()) + + # at the very least the server should be able to run commands + assert 'runcommand' in self.capabilities + + self._encoding = msg[1][len('encoding: '):] + if not self._encoding: + raise error.ResponseError("bad hello message: expected 'encoding: '" + ", got %r" % msg[1]) + + def _readchannel(self): + data = self.server.stdout.read(hgclient.outputfmtsize) + if not data: + raise error.ServerError() + channel, length = struct.unpack(hgclient.outputfmt, data) + if channel in 'IL': + return channel, length + else: + return channel, self.server.stdout.read(length) + + def _parserevs(self, splitted): + ''' splitted is a list of fields according to our rev.style, where each 6 + fields compose one revision. ''' + return [self.revision._make(rev) for rev in util.grouper(6, splitted)] + + def _eatlines(self, s, n): + idx = 0 + for i in xrange(n): + idx = s.find('\n', idx) + 1 + + return s[idx:] + + def runcommand(self, args, inchannels, outchannels): + def writeblock(data): + self.server.stdin.write(struct.pack(self.inputfmt, len(data))) + self.server.stdin.write(data) + self.server.stdin.flush() + + if not self.server: + raise ValueError("server not connected") + + self.server.stdin.write('runcommand\n') + writeblock('\0'.join(args)) + + while True: + channel, data = self._readchannel() + + # input channels + if channel in inchannels: + writeblock(inchannels[channel](data)) + # output channels + elif channel in outchannels: + outchannels[channel](data) + # result channel, command finished + elif channel == 'r': + return struct.unpack(hgclient.retfmt, data)[0] + # a channel that we don't know and can't ignore + elif channel.isupper(): + raise error.ResponseError("unexpected data on required channel '%s'" + % channel) + # optional channel + else: + pass + + def outputruncommand(self, args, inchannels = {}, raiseonerror=True): + ''' run the command specified by args, returning (ret, output, error) ''' + out, err = cStringIO.StringIO(), cStringIO.StringIO() + outchannels = {'o' : out.write, 'e' : err.write} + ret = self.runcommand(args, inchannels, outchannels) + if ret and raiseonerror: + raise error.CommandError(args, ret, out.getvalue(), err.getvalue()) + return ret, out.getvalue(), err.getvalue() + + def close(self): + self.server.stdin.close() + self.server.wait() + ret = self.server.returncode + self.server = None + return ret + + @property + def encoding(self): + """ get the servers encoding """ + if not 'getencoding' in self.capabilities: + raise CapabilityError('getencoding') + + if not self._encoding: + self.server.stdin.write('getencoding\n') + self._encoding = self._readfromchannel('r') + + return self._encoding + + def config(self, refresh=False): + if not self._config or refresh: + self._config.clear() + + ret, out, err = self.outputruncommand(['showconfig']) + if ret: + raise error.CommandError(['showconfig'], ret, out, err) + + for entry in cStringIO.StringIO(out): + k, v = entry.rstrip().split('=', 1) + section, name = k.split('.', 1) + self._config.setdefault(section, {})[name] = v + + return self._config + + def status(self): + ret, out = self.outputruncommand(['status', '-0']) + + d = dict((c, []) for c in 'MARC!?I') + + for entry in out.split('\0'): + if entry: + t, f = entry.split(' ', 1) + d[t].append(f) + + return d + + def log(self, revrange=None): + args = ['log'] + self.revstyle + if revrange: + args.append('-r') + args += revrange + + out = self.outputruncommand(args)[1] + out = out.split('\0')[:-1] + + return self._parserevs(out) + + def incoming(self, revrange=None, path=None): + args = ['incoming'] + self.revstyle + if revrange: + args.append('-r') + args += revrange + + if path: + args += [path] + + ret, out, err = self.outputruncommand(args, raiseonerror=False) + if not ret: + out = self._eatlines(out, 2).split('\0')[:-1] + return self._parserevs(out) + elif ret == 1: + return [] + else: + raise error.CommandError(args, ret, out, err) + + def outgoing(self, revrange=None, path=None): + args = ['outgoing'] + self.revstyle + if revrange: + args.append('-r') + args += revrange + + if path: + args += [path] + + ret, out, err = self.outputruncommand(args, raiseonerror=False) + if not ret: + out = self._eatlines(out, 2).split('\0')[:-1] + return self._parserevs(out) + elif ret == 1: + return [] + else: + raise error.CommandError(args, ret, out, err) + + def commit(self, message, addremove=False): + args = ['commit', '-m', message] + + if addremove: + args += ['-A'] + + self.outputruncommand(args) + + # hope the tip hasn't changed since we committed + return self.tip() + + def import_(self, patch): + if isinstance(patch, str): + fp = open(patch) + else: + assert hasattr(patch, 'read') + assert hasattr(patch, 'readline') + + fp = patch + + try: + inchannels = {'I' : fp.read, 'L' : fp.readline} + self.outputruncommand(['import', '-'], inchannels) + finally: + if fp != patch: + fp.close() + + def root(self): + return self.outputruncommand(['root'])[1].rstrip() + + def clone(self, source='.', dest=None, branch=None, updaterev=None, + revrange=None): + args = ['clone'] + + if branch: + args += ['-b', branch] + if updaterev: + args += ['-u', updaterev] + if revrange: + args.append('-r') + args += revrange + args.append(source) + + if dest: + args.append(dest) + + self.outputruncommand(args) + + def tip(self): + out = self.outputruncommand(['tip'] + self.revstyle)[1] + out = out.split('\0') + + return self._parserevs(out)[0] + + def branch(self, name=None): + if not name: + return self.outputruncommand(['branch'])[1].rstrip() + + def branches(self): + out = self.outputruncommand(['branches'])[1] + branches = {} + for line in out.rstrip().split('\n'): + branch, revnode = line.split() + branches[branch] = self.log(revrange=[revnode.split(':')[0]])[0] + + return branches + + def paths(self, name=None): + if not name: + out = self.outputruncommand(['paths'])[1] + if not out: + return {} + + return dict([s.split(' = ') for s in out.rstrip().split('\n')]) + else: + args = ['paths', name] + ret, out, err = self.outputruncommand(args, raiseonerror=False) + if ret: + raise error.CommandError(args, ret, out, err) + return out.rstrip() + + def cat(self, files, rev=None, output=None): + args = ['cat'] + if rev: + args += ['-r', rev] + if output: + args += ['-o', output] + + args += files + ret, out, err = self.outputruncommand(args) + + if not output: + return out
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hglib/styles/rev.style Wed Jul 20 16:09:34 2011 -0500 @@ -0,0 +1,1 @@ +changeset = '{rev}\0{node}\0{tags}\0{branch}\0{author}\0{desc}\0'
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hglib/util.py Wed Jul 20 16:09:34 2011 -0500 @@ -0,0 +1,6 @@ +import itertools + +def grouper(n, iterable): + ''' list(grouper(2, range(4))) -> [(0, 1), (2, 3)] ''' + args = [iter(iterable)] * n + return itertools.izip(*args)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test-hglib.py Wed Jul 20 16:09:34 2011 -0500 @@ -0,0 +1,114 @@ +#!/usr/bin/env python + +import unittest + +import sys, os, subprocess, cStringIO, shutil, tempfile + +# XXX fix this hack +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../') +from hglib import hglib + +class test_hglib(unittest.TestCase): + def setUp(self): + self._tmpdir = tempfile.mkdtemp() + os.chdir(self._tmpdir) + # until we can run norepo commands in the cmdserver + os.system('hg init') + self.client = hglib.connect() + + def tearDown(self): + shutil.rmtree(self._tmpdir) + + def append(self, path, *args): + f = open(path, 'a') + for a in args: + f.write(str(a)) + f.close() + + def test_log(self): + self.append('a', 'a') + rev0 = self.client.commit('first', addremove=True) + self.append('a', 'a') + rev1 = self.client.commit('second') + + revs = self.client.log() + revs.reverse() + + self.assertTrue(len(revs) == 2) + self.assertEquals(revs[1], rev1) + + self.assertEquals(revs[0], self.client.log('0')[0]) + + def test_outgoing_incoming(self): + self.append('a', 'a') + self.client.commit('first', addremove=True) + self.append('a', 'a') + self.client.commit('second') + + self.client.clone(dest='bar') + bar = hglib.connect('bar') + + self.assertEquals(self.client.log(), bar.log()) + self.assertEquals(self.client.outgoing(path='bar'), bar.incoming()) + + self.append('a', 'a') + rev = self.client.commit('third') + out = self.client.outgoing(path='bar') + + self.assertEquals(len(out), 1) + self.assertEquals(out[0], rev) + + self.assertEquals(out, bar.incoming()) + + def test_branch(self): + self.assertEquals(self.client.branch(), 'default') + self.append('a', 'a') + rev = self.client.commit('first', addremove=True) + branches = self.client.branches() + + self.assertEquals(rev, branches[rev.branch]) + + def test_encoding(self): + self.client = hglib.connect(encoding='utf-8') + self.assertEquals(self.client.encoding, 'utf-8') + + def test_paths(self): + open('.hg/hgrc', 'a').write('[paths]\nfoo = bar\n') + + # hgrc isn't watched for changes yet, have to reconnect + self.client = hglib.connect() + paths = self.client.paths() + self.assertEquals(len(paths), 1) + self.assertEquals(paths['foo'], os.path.abspath('bar')) + self.assertEquals(self.client.paths('foo'), os.path.abspath('bar')) + + def test_import(self): + patch = """ +# HG changeset patch +# User test +# Date 0 0 +# Node ID c103a3dec114d882c98382d684d8af798d09d857 +# Parent 0000000000000000000000000000000000000000 +1 + +diff -r 000000000000 -r c103a3dec114 a +--- /dev/null Thu Jan 01 00:00:00 1970 +0000 ++++ b/a Thu Jan 01 00:00:00 1970 +0000 +@@ -0,0 +1,1 @@ ++1 +""" + self.client.import_(cStringIO.StringIO(patch)) + self.assertEquals(self.client.cat(['a']), '1\n') + +if __name__ == '__main__': + stream = cStringIO.StringIO() + runner = unittest.TextTestRunner(stream=stream, verbosity=0) + + # XXX fix this + module = __import__('__main__') + loader = unittest.TestLoader() + ret = not runner.run(loader.loadTestsFromModule(module)).wasSuccessful() + if ret: + print stream.getvalue() + + sys.exit(ret)