view hgext/gpg.py @ 16324:46b991a1f428

record: allow splitting of hunks by manually editing patches It is possible that unrelated changes in a file are on sequential lines. The current record extension does not allow these to be committed independently. An example use case for this is in software development for deeply embedded real-time systems. In these environments, it is not always possible to use a debugger (due to time-constraints) and hence inline UART-based printing is often used. When fixing a bug in a module, it is often convenient to add a large number of 'printf's (linked to the UART via a custom fputc) to the module in order to work out what is going wrong. printf is a very slow function (and also variadic so somewhat frowned upon by the MISRA standard) and hence it is highly undesirable to commit these lines to the repository. If only a partial fix is implemented, however, it is desirable to commit the fix without deleting all of the printf lines. This is also simplifies removal of the printf lines as once the final fix is committed, 'hg revert' does the rest. It is likely that the printf lines will be very near the actual fix, so being able to split the hunk is very useful in this case. There were two alternatives I considered for the user interface. One was to manually edit the patch, the other to allow a hunk to be split into individual lines for consideration. The latter option would require a significant refactor of the record module and is less flexible. While the former is potentially more complicated to use, this is a feature that is likely to only be used in certain exceptional cases (such as the use case proposed above) and hence I felt that the complexity would not be a considerable issue. I've also written a follow-up patch that refactors the 'prompt' code to base everything on the choices variable. This tidies up and clarifies the code a bit (removes constructs like 'if ret == 7' and removes the 'e' option from the file scope options as it's not relevant there. It's not really a necessity, so I've excluded it from this submission for now, but I can send it separately if there's a desire and it's on bitbucket (see below) in the meantime. Possible future improvements include: * Tidying up the 'prompt' code to base everything on the choices variable. This would allow entries to be removed from the prompt as currently 'e' is offered even for entire file patches, which is currently unsupported. * Allowing the entire file (or even multi-file) patch to be edited manually: this would require quite a large refactor without much benefit, so I decided to exclude it from the initial submission. * Allow the option to retry if a patch fails to apply (this is what Git does). This would require quite a bit of refactoring given the current 'hg record' implementation, so it's debatable whether it's worth it. Output is similar to existing record user interface except that an additional option ('e') exists to allow manual editing of the patch. This opens the user's configured editor with the patch. A comment is added to the bottom of the patch explaining what to do (based on Git's one). A large proportion of the changeset is test-case changes to update the options reported by record (Ynesfdaq? instead of Ynsfdaq?). Functional changes are in record.py and there are some new test cases in test-record.t.
author A. S. Budden <abudden@gmail.com>
date Fri, 30 Mar 2012 22:08:46 +0100
parents f3ba4125d9e9
children cfb6682961b8
line wrap: on
line source

# Copyright 2005, 2006 Benoit Boissinot <benoit.boissinot@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

'''commands to sign and verify changesets'''

import os, tempfile, binascii
from mercurial import util, commands, match, cmdutil
from mercurial import node as hgnode
from mercurial.i18n import _

cmdtable = {}
command = cmdutil.command(cmdtable)

class gpg(object):
    def __init__(self, path, key=None):
        self.path = path
        self.key = (key and " --local-user \"%s\"" % key) or ""

    def sign(self, data):
        gpgcmd = "%s --sign --detach-sign%s" % (self.path, self.key)
        return util.filter(data, gpgcmd)

    def verify(self, data, sig):
        """ returns of the good and bad signatures"""
        sigfile = datafile = None
        try:
            # create temporary files
            fd, sigfile = tempfile.mkstemp(prefix="hg-gpg-", suffix=".sig")
            fp = os.fdopen(fd, 'wb')
            fp.write(sig)
            fp.close()
            fd, datafile = tempfile.mkstemp(prefix="hg-gpg-", suffix=".txt")
            fp = os.fdopen(fd, 'wb')
            fp.write(data)
            fp.close()
            gpgcmd = ("%s --logger-fd 1 --status-fd 1 --verify "
                      "\"%s\" \"%s\"" % (self.path, sigfile, datafile))
            ret = util.filter("", gpgcmd)
        finally:
            for f in (sigfile, datafile):
                try:
                    if f:
                        os.unlink(f)
                except:
                    pass
        keys = []
        key, fingerprint = None, None
        err = ""
        for l in ret.splitlines():
            # see DETAILS in the gnupg documentation
            # filter the logger output
            if not l.startswith("[GNUPG:]"):
                continue
            l = l[9:]
            if l.startswith("ERRSIG"):
                err = _("error while verifying signature")
                break
            elif l.startswith("VALIDSIG"):
                # fingerprint of the primary key
                fingerprint = l.split()[10]
            elif (l.startswith("GOODSIG") or
                  l.startswith("EXPSIG") or
                  l.startswith("EXPKEYSIG") or
                  l.startswith("BADSIG")):
                if key is not None:
                    keys.append(key + [fingerprint])
                key = l.split(" ", 2)
                fingerprint = None
        if err:
            return err, []
        if key is not None:
            keys.append(key + [fingerprint])
        return err, keys

def newgpg(ui, **opts):
    """create a new gpg instance"""
    gpgpath = ui.config("gpg", "cmd", "gpg")
    gpgkey = opts.get('key')
    if not gpgkey:
        gpgkey = ui.config("gpg", "key", None)
    return gpg(gpgpath, gpgkey)

def sigwalk(repo):
    """
    walk over every sigs, yields a couple
    ((node, version, sig), (filename, linenumber))
    """
    def parsefile(fileiter, context):
        ln = 1
        for l in fileiter:
            if not l:
                continue
            yield (l.split(" ", 2), (context, ln))
            ln += 1

    # read the heads
    fl = repo.file(".hgsigs")
    for r in reversed(fl.heads()):
        fn = ".hgsigs|%s" % hgnode.short(r)
        for item in parsefile(fl.read(r).splitlines(), fn):
            yield item
    try:
        # read local signatures
        fn = "localsigs"
        for item in parsefile(repo.opener(fn), fn):
            yield item
    except IOError:
        pass

def getkeys(ui, repo, mygpg, sigdata, context):
    """get the keys who signed a data"""
    fn, ln = context
    node, version, sig = sigdata
    prefix = "%s:%d" % (fn, ln)
    node = hgnode.bin(node)

    data = node2txt(repo, node, version)
    sig = binascii.a2b_base64(sig)
    err, keys = mygpg.verify(data, sig)
    if err:
        ui.warn("%s:%d %s\n" % (fn, ln , err))
        return None

    validkeys = []
    # warn for expired key and/or sigs
    for key in keys:
        if key[0] == "BADSIG":
            ui.write(_("%s Bad signature from \"%s\"\n") % (prefix, key[2]))
            continue
        if key[0] == "EXPSIG":
            ui.write(_("%s Note: Signature has expired"
                       " (signed by: \"%s\")\n") % (prefix, key[2]))
        elif key[0] == "EXPKEYSIG":
            ui.write(_("%s Note: This key has expired"
                       " (signed by: \"%s\")\n") % (prefix, key[2]))
        validkeys.append((key[1], key[2], key[3]))
    return validkeys

@command("sigs", [], _('hg sigs'))
def sigs(ui, repo):
    """list signed changesets"""
    mygpg = newgpg(ui)
    revs = {}

    for data, context in sigwalk(repo):
        node, version, sig = data
        fn, ln = context
        try:
            n = repo.lookup(node)
        except KeyError:
            ui.warn(_("%s:%d node does not exist\n") % (fn, ln))
            continue
        r = repo.changelog.rev(n)
        keys = getkeys(ui, repo, mygpg, data, context)
        if not keys:
            continue
        revs.setdefault(r, [])
        revs[r].extend(keys)
    for rev in sorted(revs, reverse=True):
        for k in revs[rev]:
            r = "%5d:%s" % (rev, hgnode.hex(repo.changelog.node(rev)))
            ui.write("%-30s %s\n" % (keystr(ui, k), r))

@command("sigcheck", [], _('hg sigcheck REVISION'))
def check(ui, repo, rev):
    """verify all the signatures there may be for a particular revision"""
    mygpg = newgpg(ui)
    rev = repo.lookup(rev)
    hexrev = hgnode.hex(rev)
    keys = []

    for data, context in sigwalk(repo):
        node, version, sig = data
        if node == hexrev:
            k = getkeys(ui, repo, mygpg, data, context)
            if k:
                keys.extend(k)

    if not keys:
        ui.write(_("No valid signature for %s\n") % hgnode.short(rev))
        return

    # print summary
    ui.write("%s is signed by:\n" % hgnode.short(rev))
    for key in keys:
        ui.write(" %s\n" % keystr(ui, key))

def keystr(ui, key):
    """associate a string to a key (username, comment)"""
    keyid, user, fingerprint = key
    comment = ui.config("gpg", fingerprint, None)
    if comment:
        return "%s (%s)" % (user, comment)
    else:
        return user

@command("sign",
         [('l', 'local', None, _('make the signature local')),
          ('f', 'force', None, _('sign even if the sigfile is modified')),
          ('', 'no-commit', None, _('do not commit the sigfile after signing')),
          ('k', 'key', '',
           _('the key id to sign with'), _('ID')),
          ('m', 'message', '',
           _('commit message'), _('TEXT')),
         ] + commands.commitopts2,
         _('hg sign [OPTION]... [REVISION]...'))
def sign(ui, repo, *revs, **opts):
    """add a signature for the current or given revision

    If no revision is given, the parent of the working directory is used,
    or tip if no revision is checked out.

    See :hg:`help dates` for a list of formats valid for -d/--date.
    """

    mygpg = newgpg(ui, **opts)
    sigver = "0"
    sigmessage = ""

    date = opts.get('date')
    if date:
        opts['date'] = util.parsedate(date)

    if revs:
        nodes = [repo.lookup(n) for n in revs]
    else:
        nodes = [node for node in repo.dirstate.parents()
                 if node != hgnode.nullid]
        if len(nodes) > 1:
            raise util.Abort(_('uncommitted merge - please provide a '
                               'specific revision'))
        if not nodes:
            nodes = [repo.changelog.tip()]

    for n in nodes:
        hexnode = hgnode.hex(n)
        ui.write(_("Signing %d:%s\n") % (repo.changelog.rev(n),
                                         hgnode.short(n)))
        # build data
        data = node2txt(repo, n, sigver)
        sig = mygpg.sign(data)
        if not sig:
            raise util.Abort(_("error while signing"))
        sig = binascii.b2a_base64(sig)
        sig = sig.replace("\n", "")
        sigmessage += "%s %s %s\n" % (hexnode, sigver, sig)

    # write it
    if opts['local']:
        repo.opener.append("localsigs", sigmessage)
        return

    msigs = match.exact(repo.root, '', ['.hgsigs'])
    s = repo.status(match=msigs, unknown=True, ignored=True)[:6]
    if util.any(s) and not opts["force"]:
        raise util.Abort(_("working copy of .hgsigs is changed "
                           "(please commit .hgsigs manually "
                           "or use --force)"))

    sigsfile = repo.wfile(".hgsigs", "ab")
    sigsfile.write(sigmessage)
    sigsfile.close()

    if '.hgsigs' not in repo.dirstate:
        repo[None].add([".hgsigs"])

    if opts["no_commit"]:
        return

    message = opts['message']
    if not message:
        # we don't translate commit messages
        message = "\n".join(["Added signature for changeset %s"
                             % hgnode.short(n)
                             for n in nodes])
    try:
        repo.commit(message, opts['user'], opts['date'], match=msigs)
    except ValueError, inst:
        raise util.Abort(str(inst))

def node2txt(repo, node, ver):
    """map a manifest into some text"""
    if ver == "0":
        return "%s\n" % hgnode.hex(node)
    else:
        raise util.Abort(_("unknown signature version"))