view hgext/gpg.py @ 45095:8e04607023e5

procutil: ensure that procutil.std{out,err}.write() writes all bytes Python 3 offers different kind of streams and it’s not guaranteed for all of them that calling write() writes all bytes. When Python is started in unbuffered mode, sys.std{out,err}.buffer are instances of io.FileIO, whose write() can write less bytes for platform-specific reasons (e.g. Linux has a 0x7ffff000 bytes maximum and could write less if interrupted by a signal; when writing to Windows consoles, it’s limited to 32767 bytes to avoid the "not enough space" error). This can lead to silent loss of data, both when using sys.std{out,err}.buffer (which may in fact not be a buffered stream) and when using the text streams sys.std{out,err} (I’ve created a CPython bug report for that: https://bugs.python.org/issue41221). Python may fix the problem at some point. For now, we implement our own wrapper for procutil.std{out,err} that calls the raw stream’s write() method until all bytes have been written. We don’t use sys.std{out,err} for larger writes, so I think it’s not worth the effort to patch them.
author Manuel Jacob <me@manueljacob.de>
date Fri, 10 Jul 2020 12:27:58 +0200
parents 9f70512ae2cf
children 89a2afe31e82
line wrap: on
line source

# Copyright 2005, 2006 Benoit Boissinot <benoit.boissinot@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

'''commands to sign and verify changesets'''

from __future__ import absolute_import

import binascii
import os

from mercurial.i18n import _
from mercurial import (
    cmdutil,
    error,
    help,
    match,
    node as hgnode,
    pycompat,
    registrar,
)
from mercurial.utils import (
    dateutil,
    procutil,
)

cmdtable = {}
command = registrar.command(cmdtable)
# Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
# extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
# be specifying the version(s) of Mercurial they are tested with, or
# leave the attribute unspecified.
testedwith = b'ships-with-hg-core'

configtable = {}
configitem = registrar.configitem(configtable)

configitem(
    b'gpg', b'cmd', default=b'gpg',
)
configitem(
    b'gpg', b'key', default=None,
)
configitem(
    b'gpg', b'.*', default=None, generic=True,
)

# Custom help category
_HELP_CATEGORY = b'gpg'
help.CATEGORY_ORDER.insert(
    help.CATEGORY_ORDER.index(registrar.command.CATEGORY_HELP), _HELP_CATEGORY
)
help.CATEGORY_NAMES[_HELP_CATEGORY] = b'Signing changes (GPG)'


class gpg(object):
    def __init__(self, path, key=None):
        self.path = path
        self.key = (key and b" --local-user \"%s\"" % key) or b""

    def sign(self, data):
        gpgcmd = b"%s --sign --detach-sign%s" % (self.path, self.key)
        return procutil.filter(data, gpgcmd)

    def verify(self, data, sig):
        """ returns of the good and bad signatures"""
        sigfile = datafile = None
        try:
            # create temporary files
            fd, sigfile = pycompat.mkstemp(prefix=b"hg-gpg-", suffix=b".sig")
            fp = os.fdopen(fd, 'wb')
            fp.write(sig)
            fp.close()
            fd, datafile = pycompat.mkstemp(prefix=b"hg-gpg-", suffix=b".txt")
            fp = os.fdopen(fd, 'wb')
            fp.write(data)
            fp.close()
            gpgcmd = (
                b"%s --logger-fd 1 --status-fd 1 --verify \"%s\" \"%s\""
                % (self.path, sigfile, datafile,)
            )
            ret = procutil.filter(b"", gpgcmd)
        finally:
            for f in (sigfile, datafile):
                try:
                    if f:
                        os.unlink(f)
                except OSError:
                    pass
        keys = []
        key, fingerprint = None, None
        for l in ret.splitlines():
            # see DETAILS in the gnupg documentation
            # filter the logger output
            if not l.startswith(b"[GNUPG:]"):
                continue
            l = l[9:]
            if l.startswith(b"VALIDSIG"):
                # fingerprint of the primary key
                fingerprint = l.split()[10]
            elif l.startswith(b"ERRSIG"):
                key = l.split(b" ", 3)[:2]
                key.append(b"")
                fingerprint = None
            elif (
                l.startswith(b"GOODSIG")
                or l.startswith(b"EXPSIG")
                or l.startswith(b"EXPKEYSIG")
                or l.startswith(b"BADSIG")
            ):
                if key is not None:
                    keys.append(key + [fingerprint])
                key = l.split(b" ", 2)
                fingerprint = None
        if key is not None:
            keys.append(key + [fingerprint])
        return keys


def newgpg(ui, **opts):
    """create a new gpg instance"""
    gpgpath = ui.config(b"gpg", b"cmd")
    gpgkey = opts.get('key')
    if not gpgkey:
        gpgkey = ui.config(b"gpg", b"key")
    return gpg(gpgpath, gpgkey)


def sigwalk(repo):
    """
    walk over every sigs, yields a couple
    ((node, version, sig), (filename, linenumber))
    """

    def parsefile(fileiter, context):
        ln = 1
        for l in fileiter:
            if not l:
                continue
            yield (l.split(b" ", 2), (context, ln))
            ln += 1

    # read the heads
    fl = repo.file(b".hgsigs")
    for r in reversed(fl.heads()):
        fn = b".hgsigs|%s" % hgnode.short(r)
        for item in parsefile(fl.read(r).splitlines(), fn):
            yield item
    try:
        # read local signatures
        fn = b"localsigs"
        for item in parsefile(repo.vfs(fn), fn):
            yield item
    except IOError:
        pass


def getkeys(ui, repo, mygpg, sigdata, context):
    """get the keys who signed a data"""
    fn, ln = context
    node, version, sig = sigdata
    prefix = b"%s:%d" % (fn, ln)
    node = hgnode.bin(node)

    data = node2txt(repo, node, version)
    sig = binascii.a2b_base64(sig)
    keys = mygpg.verify(data, sig)

    validkeys = []
    # warn for expired key and/or sigs
    for key in keys:
        if key[0] == b"ERRSIG":
            ui.write(_(b"%s Unknown key ID \"%s\"\n") % (prefix, key[1]))
            continue
        if key[0] == b"BADSIG":
            ui.write(_(b"%s Bad signature from \"%s\"\n") % (prefix, key[2]))
            continue
        if key[0] == b"EXPSIG":
            ui.write(
                _(b"%s Note: Signature has expired (signed by: \"%s\")\n")
                % (prefix, key[2])
            )
        elif key[0] == b"EXPKEYSIG":
            ui.write(
                _(b"%s Note: This key has expired (signed by: \"%s\")\n")
                % (prefix, key[2])
            )
        validkeys.append((key[1], key[2], key[3]))
    return validkeys


@command(b"sigs", [], _(b'hg sigs'), helpcategory=_HELP_CATEGORY)
def sigs(ui, repo):
    """list signed changesets"""
    mygpg = newgpg(ui)
    revs = {}

    for data, context in sigwalk(repo):
        node, version, sig = data
        fn, ln = context
        try:
            n = repo.lookup(node)
        except KeyError:
            ui.warn(_(b"%s:%d node does not exist\n") % (fn, ln))
            continue
        r = repo.changelog.rev(n)
        keys = getkeys(ui, repo, mygpg, data, context)
        if not keys:
            continue
        revs.setdefault(r, [])
        revs[r].extend(keys)
    for rev in sorted(revs, reverse=True):
        for k in revs[rev]:
            r = b"%5d:%s" % (rev, hgnode.hex(repo.changelog.node(rev)))
            ui.write(b"%-30s %s\n" % (keystr(ui, k), r))


@command(b"sigcheck", [], _(b'hg sigcheck REV'), helpcategory=_HELP_CATEGORY)
def sigcheck(ui, repo, rev):
    """verify all the signatures there may be for a particular revision"""
    mygpg = newgpg(ui)
    rev = repo.lookup(rev)
    hexrev = hgnode.hex(rev)
    keys = []

    for data, context in sigwalk(repo):
        node, version, sig = data
        if node == hexrev:
            k = getkeys(ui, repo, mygpg, data, context)
            if k:
                keys.extend(k)

    if not keys:
        ui.write(_(b"no valid signature for %s\n") % hgnode.short(rev))
        return

    # print summary
    ui.write(_(b"%s is signed by:\n") % hgnode.short(rev))
    for key in keys:
        ui.write(b" %s\n" % keystr(ui, key))


def keystr(ui, key):
    """associate a string to a key (username, comment)"""
    keyid, user, fingerprint = key
    comment = ui.config(b"gpg", fingerprint)
    if comment:
        return b"%s (%s)" % (user, comment)
    else:
        return user


@command(
    b"sign",
    [
        (b'l', b'local', None, _(b'make the signature local')),
        (b'f', b'force', None, _(b'sign even if the sigfile is modified')),
        (
            b'',
            b'no-commit',
            None,
            _(b'do not commit the sigfile after signing'),
        ),
        (b'k', b'key', b'', _(b'the key id to sign with'), _(b'ID')),
        (b'm', b'message', b'', _(b'use text as commit message'), _(b'TEXT')),
        (b'e', b'edit', False, _(b'invoke editor on commit messages')),
    ]
    + cmdutil.commitopts2,
    _(b'hg sign [OPTION]... [REV]...'),
    helpcategory=_HELP_CATEGORY,
)
def sign(ui, repo, *revs, **opts):
    """add a signature for the current or given revision

    If no revision is given, the parent of the working directory is used,
    or tip if no revision is checked out.

    The ``gpg.cmd`` config setting can be used to specify the command
    to run. A default key can be specified with ``gpg.key``.

    See :hg:`help dates` for a list of formats valid for -d/--date.
    """
    with repo.wlock():
        return _dosign(ui, repo, *revs, **opts)


def _dosign(ui, repo, *revs, **opts):
    mygpg = newgpg(ui, **opts)
    opts = pycompat.byteskwargs(opts)
    sigver = b"0"
    sigmessage = b""

    date = opts.get(b'date')
    if date:
        opts[b'date'] = dateutil.parsedate(date)

    if revs:
        nodes = [repo.lookup(n) for n in revs]
    else:
        nodes = [
            node for node in repo.dirstate.parents() if node != hgnode.nullid
        ]
        if len(nodes) > 1:
            raise error.Abort(
                _(b'uncommitted merge - please provide a specific revision')
            )
        if not nodes:
            nodes = [repo.changelog.tip()]

    for n in nodes:
        hexnode = hgnode.hex(n)
        ui.write(
            _(b"signing %d:%s\n") % (repo.changelog.rev(n), hgnode.short(n))
        )
        # build data
        data = node2txt(repo, n, sigver)
        sig = mygpg.sign(data)
        if not sig:
            raise error.Abort(_(b"error while signing"))
        sig = binascii.b2a_base64(sig)
        sig = sig.replace(b"\n", b"")
        sigmessage += b"%s %s %s\n" % (hexnode, sigver, sig)

    # write it
    if opts[b'local']:
        repo.vfs.append(b"localsigs", sigmessage)
        return

    if not opts[b"force"]:
        msigs = match.exact([b'.hgsigs'])
        if any(repo.status(match=msigs, unknown=True, ignored=True)):
            raise error.Abort(
                _(b"working copy of .hgsigs is changed "),
                hint=_(b"please commit .hgsigs manually"),
            )

    sigsfile = repo.wvfs(b".hgsigs", b"ab")
    sigsfile.write(sigmessage)
    sigsfile.close()

    if b'.hgsigs' not in repo.dirstate:
        repo[None].add([b".hgsigs"])

    if opts[b"no_commit"]:
        return

    message = opts[b'message']
    if not message:
        # we don't translate commit messages
        message = b"\n".join(
            [
                b"Added signature for changeset %s" % hgnode.short(n)
                for n in nodes
            ]
        )
    try:
        editor = cmdutil.getcommiteditor(
            editform=b'gpg.sign', **pycompat.strkwargs(opts)
        )
        repo.commit(
            message, opts[b'user'], opts[b'date'], match=msigs, editor=editor
        )
    except ValueError as inst:
        raise error.Abort(pycompat.bytestr(inst))


def node2txt(repo, node, ver):
    """map a manifest into some text"""
    if ver == b"0":
        return b"%s\n" % hgnode.hex(node)
    else:
        raise error.Abort(_(b"unknown signature version"))


def extsetup(ui):
    # Add our category before "Repository maintenance".
    help.CATEGORY_ORDER.insert(
        help.CATEGORY_ORDER.index(command.CATEGORY_MAINTENANCE), _HELP_CATEGORY
    )
    help.CATEGORY_NAMES[_HELP_CATEGORY] = b'GPG signing'