view mercurial/changegroup.py @ 20182:04036798ebed

branches: avoid unnecessary changectx.branch() calls This requires reading from the changelog, which can be costly over NFS. Note that this does not totally remove reading from the changelog; we still do that when calling changectx.closesbranch(). That call will be removed in a later patch. Running hg branches on the PyPy repo (with 996) over a busy NFS server, before this change: $ time hg --profile branches > /dev/null CallCount Recursive Total(s) Inline(s) module:lineno(function) 2042 0 2.2827 2.2827 <open> 2036 0 0.9840 0.9840 <method 'close' of 'file' objects> 2036 0 0.0464 0.0464 <method 'read' of 'file' objects> 5233 0 0.1985 0.0453 mercurial.repoview:161(changelog) 10462 0 0.0791 0.0314 mercurial.changelog:133(tip) 5233 0 0.0388 0.0176 mercurial.localrepo:26(__get__) 10462 0 0.0250 0.0126 <len> 5233 0 0.0059 0.0039 mercurial.repoview:112(filterrevs) 10462 0 0.0029 0.0029 <hash> 2034 0 0.0444 0.0444 <method 'seek' of 'file' objects> 5340 0 0.0390 0.0390 mercurial.revlog:296(rev) 2582 0 0.0371 0.0371 <zlib.decompress> 3155 0 0.1963 0.0366 mercurial.context:202(__init__) 3155 0 0.1238 0.0306 mercurial.repoview:161(changelog) 3155 0 0.0261 0.0080 mercurial.changelog:183(rev) 9465 0 0.0061 0.0061 <isinstance> 1096 0 0.0023 0.0023 <binascii.unhexlify> 4251 0 0.0014 0.0014 <len> 2059 0 3.7341 0.0332 mercurial.changelog:270(read) 2059 0 3.6304 0.0307 mercurial.revlog:907(revision) 2057 0 0.0262 0.0137 mercurial.changelog:28(decodeextra) 4118 0 0.0094 0.0094 <method 'split' of 'str' objects> 4118 0 0.0270 0.0048 mercurial.encoding:61(tolocal) 2059 0 0.0040 0.0040 <method 'index' of 'str' objects> 10462 0 0.0791 0.0314 mercurial.changelog:133(tip) 10462 0 0.0289 0.0207 mercurial.changelog:190(node) 10462 0 0.0188 0.0091 <len> 52433 20932 0.0478 0.0310 <len> 20932 0 0.0221 0.0168 mercurial.revlog:262(__len__) 2059 0 3.6304 0.0307 mercurial.revlog:907(revision) real 0m4.361s user 0m0.986s sys 0m0.237s After this change: $ time hg --profile branches > /dev/null CallCount Recursive Total(s) Inline(s) module:lineno(function) 1069 0 1.1098 1.1098 <open> 1063 0 0.4865 0.4865 <method 'close' of 'file' objects> 4122 0 0.1811 0.0404 mercurial.repoview:161(changelog) 8240 0 0.0712 0.0272 mercurial.changelog:133(tip) 4122 0 0.0378 0.0177 mercurial.localrepo:26(__get__) 8240 0 0.0221 0.0115 <len> 4122 0 0.0057 0.0033 mercurial.repoview:112(filterrevs) 8240 0 0.0025 0.0025 <hash> 3029 0 0.1979 0.0371 mercurial.context:202(__init__) 3029 0 0.1278 0.0310 mercurial.repoview:161(changelog) 3029 0 0.0230 0.0081 mercurial.changelog:183(rev) 9087 0 0.0061 0.0061 <isinstance> 1096 0 0.0026 0.0026 <binascii.unhexlify> 4125 0 0.0014 0.0014 <len> 4229 0 0.0337 0.0337 mercurial.revlog:296(rev) 1061 0 0.0296 0.0296 <method 'seek' of 'file' objects> 1063 0 0.0292 0.0292 <method 'read' of 'file' objects> 8240 0 0.0712 0.0272 mercurial.changelog:133(tip) 8240 0 0.0271 0.0196 mercurial.changelog:190(node) 8240 0 0.0169 0.0083 <len> 40476 16488 0.0422 0.0271 <len> 16488 0 0.0193 0.0152 mercurial.revlog:262(__len__) 1342 0 0.0241 0.0241 <zlib.decompress> 9445 0 0.0336 0.0224 mercurial.changelog:190(node) 9445 0 0.0112 0.0112 mercurial.revlog:317(node) 1074 0 1.9102 0.0224 mercurial.changelog:270(read) 1074 0 1.8397 0.0202 mercurial.revlog:907(revision) 1073 0 0.0187 0.0099 mercurial.changelog:28(decodeextra) 2148 0 0.0061 0.0061 <method 'split' of 'str' objects> 2148 0 0.0184 0.0034 mercurial.encoding:61(tolocal) real 0m2.402s user 0m0.735s sys 0m0.177s
author Brodie Rao <brodie@sf.io>
date Fri, 15 Nov 2013 23:18:08 -0500
parents fd4f612f7cb6
children f8d50add83e1 bfb40168391c
line wrap: on
line source

# changegroup.py - Mercurial changegroup manipulation functions
#
#  Copyright 2006 Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from i18n import _
from node import nullrev, hex
import mdiff, util, dagutil
import struct, os, bz2, zlib, tempfile

_BUNDLE10_DELTA_HEADER = "20s20s20s20s"

def readexactly(stream, n):
    '''read n bytes from stream.read and abort if less was available'''
    s = stream.read(n)
    if len(s) < n:
        raise util.Abort(_("stream ended unexpectedly"
                           " (got %d bytes, expected %d)")
                          % (len(s), n))
    return s

def getchunk(stream):
    """return the next chunk from stream as a string"""
    d = readexactly(stream, 4)
    l = struct.unpack(">l", d)[0]
    if l <= 4:
        if l:
            raise util.Abort(_("invalid chunk length %d") % l)
        return ""
    return readexactly(stream, l - 4)

def chunkheader(length):
    """return a changegroup chunk header (string)"""
    return struct.pack(">l", length + 4)

def closechunk():
    """return a changegroup chunk header (string) for a zero-length chunk"""
    return struct.pack(">l", 0)

class nocompress(object):
    def compress(self, x):
        return x
    def flush(self):
        return ""

bundletypes = {
    "": ("", nocompress), # only when using unbundle on ssh and old http servers
                          # since the unification ssh accepts a header but there
                          # is no capability signaling it.
    "HG10UN": ("HG10UN", nocompress),
    "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
    "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
}

# hgweb uses this list to communicate its preferred type
bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']

def writebundle(cg, filename, bundletype):
    """Write a bundle file and return its filename.

    Existing files will not be overwritten.
    If no filename is specified, a temporary file is created.
    bz2 compression can be turned off.
    The bundle file will be deleted in case of errors.
    """

    fh = None
    cleanup = None
    try:
        if filename:
            fh = open(filename, "wb")
        else:
            fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
            fh = os.fdopen(fd, "wb")
        cleanup = filename

        header, compressor = bundletypes[bundletype]
        fh.write(header)
        z = compressor()

        # parse the changegroup data, otherwise we will block
        # in case of sshrepo because we don't know the end of the stream

        # an empty chunkgroup is the end of the changegroup
        # a changegroup has at least 2 chunkgroups (changelog and manifest).
        # after that, an empty chunkgroup is the end of the changegroup
        empty = False
        count = 0
        while not empty or count <= 2:
            empty = True
            count += 1
            while True:
                chunk = getchunk(cg)
                if not chunk:
                    break
                empty = False
                fh.write(z.compress(chunkheader(len(chunk))))
                pos = 0
                while pos < len(chunk):
                    next = pos + 2**20
                    fh.write(z.compress(chunk[pos:next]))
                    pos = next
            fh.write(z.compress(closechunk()))
        fh.write(z.flush())
        cleanup = None
        return filename
    finally:
        if fh is not None:
            fh.close()
        if cleanup is not None:
            os.unlink(cleanup)

def decompressor(fh, alg):
    if alg == 'UN':
        return fh
    elif alg == 'GZ':
        def generator(f):
            zd = zlib.decompressobj()
            for chunk in util.filechunkiter(f):
                yield zd.decompress(chunk)
    elif alg == 'BZ':
        def generator(f):
            zd = bz2.BZ2Decompressor()
            zd.decompress("BZ")
            for chunk in util.filechunkiter(f, 4096):
                yield zd.decompress(chunk)
    else:
        raise util.Abort("unknown bundle compression '%s'" % alg)
    return util.chunkbuffer(generator(fh))

class unbundle10(object):
    deltaheader = _BUNDLE10_DELTA_HEADER
    deltaheadersize = struct.calcsize(deltaheader)
    def __init__(self, fh, alg):
        self._stream = decompressor(fh, alg)
        self._type = alg
        self.callback = None
    def compressed(self):
        return self._type != 'UN'
    def read(self, l):
        return self._stream.read(l)
    def seek(self, pos):
        return self._stream.seek(pos)
    def tell(self):
        return self._stream.tell()
    def close(self):
        return self._stream.close()

    def chunklength(self):
        d = readexactly(self._stream, 4)
        l = struct.unpack(">l", d)[0]
        if l <= 4:
            if l:
                raise util.Abort(_("invalid chunk length %d") % l)
            return 0
        if self.callback:
            self.callback()
        return l - 4

    def changelogheader(self):
        """v10 does not have a changelog header chunk"""
        return {}

    def manifestheader(self):
        """v10 does not have a manifest header chunk"""
        return {}

    def filelogheader(self):
        """return the header of the filelogs chunk, v10 only has the filename"""
        l = self.chunklength()
        if not l:
            return {}
        fname = readexactly(self._stream, l)
        return dict(filename=fname)

    def _deltaheader(self, headertuple, prevnode):
        node, p1, p2, cs = headertuple
        if prevnode is None:
            deltabase = p1
        else:
            deltabase = prevnode
        return node, p1, p2, deltabase, cs

    def deltachunk(self, prevnode):
        l = self.chunklength()
        if not l:
            return {}
        headerdata = readexactly(self._stream, self.deltaheadersize)
        header = struct.unpack(self.deltaheader, headerdata)
        delta = readexactly(self._stream, l - self.deltaheadersize)
        node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
        return dict(node=node, p1=p1, p2=p2, cs=cs,
                    deltabase=deltabase, delta=delta)

class headerlessfixup(object):
    def __init__(self, fh, h):
        self._h = h
        self._fh = fh
    def read(self, n):
        if self._h:
            d, self._h = self._h[:n], self._h[n:]
            if len(d) < n:
                d += readexactly(self._fh, n - len(d))
            return d
        return readexactly(self._fh, n)

def readbundle(fh, fname):
    header = readexactly(fh, 6)

    if not fname:
        fname = "stream"
        if not header.startswith('HG') and header.startswith('\0'):
            fh = headerlessfixup(fh, header)
            header = "HG10UN"

    magic, version, alg = header[0:2], header[2:4], header[4:6]

    if magic != 'HG':
        raise util.Abort(_('%s: not a Mercurial bundle') % fname)
    if version != '10':
        raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
    return unbundle10(fh, alg)

class bundle10(object):
    deltaheader = _BUNDLE10_DELTA_HEADER
    def __init__(self, repo, bundlecaps=None):
        """Given a source repo, construct a bundler.

        bundlecaps is optional and can be used to specify the set of
        capabilities which can be used to build the bundle.
        """
        # Set of capabilities we can use to build the bundle.
        if bundlecaps is None:
            bundlecaps = set()
        self._bundlecaps = bundlecaps
        self._changelog = repo.changelog
        self._manifest = repo.manifest
        reorder = repo.ui.config('bundle', 'reorder', 'auto')
        if reorder == 'auto':
            reorder = None
        else:
            reorder = util.parsebool(reorder)
        self._repo = repo
        self._reorder = reorder
        self._progress = repo.ui.progress
    def close(self):
        return closechunk()

    def fileheader(self, fname):
        return chunkheader(len(fname)) + fname

    def group(self, nodelist, revlog, lookup, units=None, reorder=None):
        """Calculate a delta group, yielding a sequence of changegroup chunks
        (strings).

        Given a list of changeset revs, return a set of deltas and
        metadata corresponding to nodes. The first delta is
        first parent(nodelist[0]) -> nodelist[0], the receiver is
        guaranteed to have this parent as it has all history before
        these changesets. In the case firstparent is nullrev the
        changegroup starts with a full revision.

        If units is not None, progress detail will be generated, units specifies
        the type of revlog that is touched (changelog, manifest, etc.).
        """
        # if we don't have any revisions touched by these changesets, bail
        if len(nodelist) == 0:
            yield self.close()
            return

        # for generaldelta revlogs, we linearize the revs; this will both be
        # much quicker and generate a much smaller bundle
        if (revlog._generaldelta and reorder is not False) or reorder:
            dag = dagutil.revlogdag(revlog)
            revs = set(revlog.rev(n) for n in nodelist)
            revs = dag.linearize(revs)
        else:
            revs = sorted([revlog.rev(n) for n in nodelist])

        # add the parent of the first rev
        p = revlog.parentrevs(revs[0])[0]
        revs.insert(0, p)

        # build deltas
        total = len(revs) - 1
        msgbundling = _('bundling')
        for r in xrange(len(revs) - 1):
            if units is not None:
                self._progress(msgbundling, r + 1, unit=units, total=total)
            prev, curr = revs[r], revs[r + 1]
            linknode = lookup(revlog.node(curr))
            for c in self.revchunk(revlog, curr, prev, linknode):
                yield c

        yield self.close()

    # filter any nodes that claim to be part of the known set
    def prune(self, revlog, missing, commonrevs, source):
        rr, rl = revlog.rev, revlog.linkrev
        return [n for n in missing if rl(rr(n)) not in commonrevs]

    def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
        '''yield a sequence of changegroup chunks (strings)'''
        repo = self._repo
        cl = self._changelog
        mf = self._manifest
        reorder = self._reorder
        progress = self._progress

        # for progress output
        msgbundling = _('bundling')

        mfs = {} # needed manifests
        fnodes = {} # needed file nodes
        changedfiles = set()

        # Callback for the changelog, used to collect changed files and manifest
        # nodes.
        # Returns the linkrev node (identity in the changelog case).
        def lookupcl(x):
            c = cl.read(x)
            changedfiles.update(c[3])
            # record the first changeset introducing this manifest version
            mfs.setdefault(c[0], x)
            return x

        # Callback for the manifest, used to collect linkrevs for filelog
        # revisions.
        # Returns the linkrev node (collected in lookupcl).
        def lookupmf(x):
            clnode = mfs[x]
            if not fastpathlinkrev:
                mdata = mf.readfast(x)
                for f, n in mdata.iteritems():
                    if f in changedfiles:
                        # record the first changeset introducing this filelog
                        # version
                        fnodes[f].setdefault(n, clnode)
            return clnode

        for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
                                reorder=reorder):
            yield chunk
        progress(msgbundling, None)

        for f in changedfiles:
            fnodes[f] = {}
        mfnodes = self.prune(mf, mfs, commonrevs, source)
        for chunk in self.group(mfnodes, mf, lookupmf, units=_('manifests'),
                                reorder=reorder):
            yield chunk
        progress(msgbundling, None)

        mfs.clear()
        needed = set(cl.rev(x) for x in clnodes)

        def linknodes(filerevlog, fname):
            if fastpathlinkrev:
                ln, llr = filerevlog.node, filerevlog.linkrev
                def genfilenodes():
                    for r in filerevlog:
                        linkrev = llr(r)
                        if linkrev in needed:
                            yield filerevlog.node(r), cl.node(linkrev)
                fnodes[fname] = dict(genfilenodes())
            return fnodes.get(fname, {})

        for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
                                        source):
            yield chunk

        yield self.close()
        progress(msgbundling, None)

        if clnodes:
            repo.hook('outgoing', node=hex(clnodes[0]), source=source)

    def generatefiles(self, changedfiles, linknodes, commonrevs, source):
        repo = self._repo
        progress = self._progress
        reorder = self._reorder
        msgbundling = _('bundling')

        total = len(changedfiles)
        # for progress output
        msgfiles = _('files')
        for i, fname in enumerate(sorted(changedfiles)):
            filerevlog = repo.file(fname)
            if not filerevlog:
                raise util.Abort(_("empty or missing revlog for %s") % fname)

            linkrevnodes = linknodes(filerevlog, fname)
            # Lookup for filenodes, we collected the linkrev nodes above in the
            # fastpath case and with lookupmf in the slowpath case.
            def lookupfilelog(x):
                return linkrevnodes[x]

            filenodes = self.prune(filerevlog, linkrevnodes, commonrevs, source)
            if filenodes:
                progress(msgbundling, i + 1, item=fname, unit=msgfiles,
                         total=total)
                yield self.fileheader(fname)
                for chunk in self.group(filenodes, filerevlog, lookupfilelog,
                                        reorder=reorder):
                    yield chunk

    def revchunk(self, revlog, rev, prev, linknode):
        node = revlog.node(rev)
        p1, p2 = revlog.parentrevs(rev)
        base = prev

        prefix = ''
        if base == nullrev:
            delta = revlog.revision(node)
            prefix = mdiff.trivialdiffheader(len(delta))
        else:
            delta = revlog.revdiff(base, rev)
        p1n, p2n = revlog.parents(node)
        basenode = revlog.node(base)
        meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
        meta += prefix
        l = len(meta) + len(delta)
        yield chunkheader(l)
        yield meta
        yield delta
    def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
        # do nothing with basenode, it is implicitly the previous one in HG10
        return struct.pack(self.deltaheader, node, p1n, p2n, linknode)