push: move `revs` argument into the push object
One more step toward a more modular push function.
# changegroup.py - Mercurial changegroup manipulation functions
#
# Copyright 2006 Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from i18n import _
from node import nullrev, hex
import mdiff, util, dagutil
import struct, os, bz2, zlib, tempfile
_BUNDLE10_DELTA_HEADER = "20s20s20s20s"
def readexactly(stream, n):
'''read n bytes from stream.read and abort if less was available'''
s = stream.read(n)
if len(s) < n:
raise util.Abort(_("stream ended unexpectedly"
" (got %d bytes, expected %d)")
% (len(s), n))
return s
def getchunk(stream):
"""return the next chunk from stream as a string"""
d = readexactly(stream, 4)
l = struct.unpack(">l", d)[0]
if l <= 4:
if l:
raise util.Abort(_("invalid chunk length %d") % l)
return ""
return readexactly(stream, l - 4)
def chunkheader(length):
"""return a changegroup chunk header (string)"""
return struct.pack(">l", length + 4)
def closechunk():
"""return a changegroup chunk header (string) for a zero-length chunk"""
return struct.pack(">l", 0)
class nocompress(object):
def compress(self, x):
return x
def flush(self):
return ""
bundletypes = {
"": ("", nocompress), # only when using unbundle on ssh and old http servers
# since the unification ssh accepts a header but there
# is no capability signaling it.
"HG10UN": ("HG10UN", nocompress),
"HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
"HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
}
# hgweb uses this list to communicate its preferred type
bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
def writebundle(cg, filename, bundletype):
"""Write a bundle file and return its filename.
Existing files will not be overwritten.
If no filename is specified, a temporary file is created.
bz2 compression can be turned off.
The bundle file will be deleted in case of errors.
"""
fh = None
cleanup = None
try:
if filename:
fh = open(filename, "wb")
else:
fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
fh = os.fdopen(fd, "wb")
cleanup = filename
header, compressor = bundletypes[bundletype]
fh.write(header)
z = compressor()
# parse the changegroup data, otherwise we will block
# in case of sshrepo because we don't know the end of the stream
# an empty chunkgroup is the end of the changegroup
# a changegroup has at least 2 chunkgroups (changelog and manifest).
# after that, an empty chunkgroup is the end of the changegroup
empty = False
count = 0
while not empty or count <= 2:
empty = True
count += 1
while True:
chunk = getchunk(cg)
if not chunk:
break
empty = False
fh.write(z.compress(chunkheader(len(chunk))))
pos = 0
while pos < len(chunk):
next = pos + 2**20
fh.write(z.compress(chunk[pos:next]))
pos = next
fh.write(z.compress(closechunk()))
fh.write(z.flush())
cleanup = None
return filename
finally:
if fh is not None:
fh.close()
if cleanup is not None:
os.unlink(cleanup)
def decompressor(fh, alg):
if alg == 'UN':
return fh
elif alg == 'GZ':
def generator(f):
zd = zlib.decompressobj()
for chunk in util.filechunkiter(f):
yield zd.decompress(chunk)
elif alg == 'BZ':
def generator(f):
zd = bz2.BZ2Decompressor()
zd.decompress("BZ")
for chunk in util.filechunkiter(f, 4096):
yield zd.decompress(chunk)
else:
raise util.Abort("unknown bundle compression '%s'" % alg)
return util.chunkbuffer(generator(fh))
class unbundle10(object):
deltaheader = _BUNDLE10_DELTA_HEADER
deltaheadersize = struct.calcsize(deltaheader)
def __init__(self, fh, alg):
self._stream = decompressor(fh, alg)
self._type = alg
self.callback = None
def compressed(self):
return self._type != 'UN'
def read(self, l):
return self._stream.read(l)
def seek(self, pos):
return self._stream.seek(pos)
def tell(self):
return self._stream.tell()
def close(self):
return self._stream.close()
def chunklength(self):
d = readexactly(self._stream, 4)
l = struct.unpack(">l", d)[0]
if l <= 4:
if l:
raise util.Abort(_("invalid chunk length %d") % l)
return 0
if self.callback:
self.callback()
return l - 4
def changelogheader(self):
"""v10 does not have a changelog header chunk"""
return {}
def manifestheader(self):
"""v10 does not have a manifest header chunk"""
return {}
def filelogheader(self):
"""return the header of the filelogs chunk, v10 only has the filename"""
l = self.chunklength()
if not l:
return {}
fname = readexactly(self._stream, l)
return dict(filename=fname)
def _deltaheader(self, headertuple, prevnode):
node, p1, p2, cs = headertuple
if prevnode is None:
deltabase = p1
else:
deltabase = prevnode
return node, p1, p2, deltabase, cs
def deltachunk(self, prevnode):
l = self.chunklength()
if not l:
return {}
headerdata = readexactly(self._stream, self.deltaheadersize)
header = struct.unpack(self.deltaheader, headerdata)
delta = readexactly(self._stream, l - self.deltaheadersize)
node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
return dict(node=node, p1=p1, p2=p2, cs=cs,
deltabase=deltabase, delta=delta)
class headerlessfixup(object):
def __init__(self, fh, h):
self._h = h
self._fh = fh
def read(self, n):
if self._h:
d, self._h = self._h[:n], self._h[n:]
if len(d) < n:
d += readexactly(self._fh, n - len(d))
return d
return readexactly(self._fh, n)
def readbundle(fh, fname):
header = readexactly(fh, 6)
if not fname:
fname = "stream"
if not header.startswith('HG') and header.startswith('\0'):
fh = headerlessfixup(fh, header)
header = "HG10UN"
magic, version, alg = header[0:2], header[2:4], header[4:6]
if magic != 'HG':
raise util.Abort(_('%s: not a Mercurial bundle') % fname)
if version != '10':
raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
return unbundle10(fh, alg)
class bundle10(object):
deltaheader = _BUNDLE10_DELTA_HEADER
def __init__(self, repo, bundlecaps=None):
"""Given a source repo, construct a bundler.
bundlecaps is optional and can be used to specify the set of
capabilities which can be used to build the bundle.
"""
# Set of capabilities we can use to build the bundle.
if bundlecaps is None:
bundlecaps = set()
self._bundlecaps = bundlecaps
self._changelog = repo.changelog
self._manifest = repo.manifest
reorder = repo.ui.config('bundle', 'reorder', 'auto')
if reorder == 'auto':
reorder = None
else:
reorder = util.parsebool(reorder)
self._repo = repo
self._reorder = reorder
self._progress = repo.ui.progress
def close(self):
return closechunk()
def fileheader(self, fname):
return chunkheader(len(fname)) + fname
def group(self, nodelist, revlog, lookup, units=None, reorder=None):
"""Calculate a delta group, yielding a sequence of changegroup chunks
(strings).
Given a list of changeset revs, return a set of deltas and
metadata corresponding to nodes. The first delta is
first parent(nodelist[0]) -> nodelist[0], the receiver is
guaranteed to have this parent as it has all history before
these changesets. In the case firstparent is nullrev the
changegroup starts with a full revision.
If units is not None, progress detail will be generated, units specifies
the type of revlog that is touched (changelog, manifest, etc.).
"""
# if we don't have any revisions touched by these changesets, bail
if len(nodelist) == 0:
yield self.close()
return
# for generaldelta revlogs, we linearize the revs; this will both be
# much quicker and generate a much smaller bundle
if (revlog._generaldelta and reorder is not False) or reorder:
dag = dagutil.revlogdag(revlog)
revs = set(revlog.rev(n) for n in nodelist)
revs = dag.linearize(revs)
else:
revs = sorted([revlog.rev(n) for n in nodelist])
# add the parent of the first rev
p = revlog.parentrevs(revs[0])[0]
revs.insert(0, p)
# build deltas
total = len(revs) - 1
msgbundling = _('bundling')
for r in xrange(len(revs) - 1):
if units is not None:
self._progress(msgbundling, r + 1, unit=units, total=total)
prev, curr = revs[r], revs[r + 1]
linknode = lookup(revlog.node(curr))
for c in self.revchunk(revlog, curr, prev, linknode):
yield c
yield self.close()
# filter any nodes that claim to be part of the known set
def prune(self, revlog, missing, commonrevs, source):
rr, rl = revlog.rev, revlog.linkrev
return [n for n in missing if rl(rr(n)) not in commonrevs]
def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
'''yield a sequence of changegroup chunks (strings)'''
repo = self._repo
cl = self._changelog
mf = self._manifest
reorder = self._reorder
progress = self._progress
# for progress output
msgbundling = _('bundling')
mfs = {} # needed manifests
fnodes = {} # needed file nodes
changedfiles = set()
# Callback for the changelog, used to collect changed files and manifest
# nodes.
# Returns the linkrev node (identity in the changelog case).
def lookupcl(x):
c = cl.read(x)
changedfiles.update(c[3])
# record the first changeset introducing this manifest version
mfs.setdefault(c[0], x)
return x
# Callback for the manifest, used to collect linkrevs for filelog
# revisions.
# Returns the linkrev node (collected in lookupcl).
def lookupmf(x):
clnode = mfs[x]
if not fastpathlinkrev:
mdata = mf.readfast(x)
for f, n in mdata.iteritems():
if f in changedfiles:
# record the first changeset introducing this filelog
# version
fnodes[f].setdefault(n, clnode)
return clnode
for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
reorder=reorder):
yield chunk
progress(msgbundling, None)
for f in changedfiles:
fnodes[f] = {}
mfnodes = self.prune(mf, mfs, commonrevs, source)
for chunk in self.group(mfnodes, mf, lookupmf, units=_('manifests'),
reorder=reorder):
yield chunk
progress(msgbundling, None)
mfs.clear()
needed = set(cl.rev(x) for x in clnodes)
def linknodes(filerevlog, fname):
if fastpathlinkrev:
ln, llr = filerevlog.node, filerevlog.linkrev
def genfilenodes():
for r in filerevlog:
linkrev = llr(r)
if linkrev in needed:
yield filerevlog.node(r), cl.node(linkrev)
fnodes[fname] = dict(genfilenodes())
return fnodes.get(fname, {})
for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
source):
yield chunk
yield self.close()
progress(msgbundling, None)
if clnodes:
repo.hook('outgoing', node=hex(clnodes[0]), source=source)
def generatefiles(self, changedfiles, linknodes, commonrevs, source):
repo = self._repo
progress = self._progress
reorder = self._reorder
msgbundling = _('bundling')
total = len(changedfiles)
# for progress output
msgfiles = _('files')
for i, fname in enumerate(sorted(changedfiles)):
filerevlog = repo.file(fname)
if not filerevlog:
raise util.Abort(_("empty or missing revlog for %s") % fname)
linkrevnodes = linknodes(filerevlog, fname)
# Lookup for filenodes, we collected the linkrev nodes above in the
# fastpath case and with lookupmf in the slowpath case.
def lookupfilelog(x):
return linkrevnodes[x]
filenodes = self.prune(filerevlog, linkrevnodes, commonrevs, source)
if filenodes:
progress(msgbundling, i + 1, item=fname, unit=msgfiles,
total=total)
yield self.fileheader(fname)
for chunk in self.group(filenodes, filerevlog, lookupfilelog,
reorder=reorder):
yield chunk
def revchunk(self, revlog, rev, prev, linknode):
node = revlog.node(rev)
p1, p2 = revlog.parentrevs(rev)
base = prev
prefix = ''
if base == nullrev:
delta = revlog.revision(node)
prefix = mdiff.trivialdiffheader(len(delta))
else:
delta = revlog.revdiff(base, rev)
p1n, p2n = revlog.parents(node)
basenode = revlog.node(base)
meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
meta += prefix
l = len(meta) + len(delta)
yield chunkheader(l)
yield meta
yield delta
def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
# do nothing with basenode, it is implicitly the previous one in HG10
return struct.pack(self.deltaheader, node, p1n, p2n, linknode)