Mercurial > evolve
view hgext3rd/pullbundle.py @ 4136:be3a94d3105f
pullbundle: attempt to save stablerange cache after each computation
Since we do not serve the full repository, we use more stablerange starting from
a random point. These are probably not loaded yet and should be cached.
This is best effort caching, if we can't save them we won't.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Mon, 24 Sep 2018 01:16:00 +0200 |
parents | 47f1d7b4305d |
children | cfdc6f55599b |
line wrap: on
line source
# Extension to provide automatic caching of bundle server for pull # # Copyright 2018 Pierre-Yves David <pierre-yves.david@ens-lyon.org> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. """pullbundle: automatic server side bundle caching General principle ================= This extension provides a means for server to use pre-computed bundle for serving arbitrary pulls. If missing, the necessary pre-computed bundle will be generated on demand. To maximize usage of existing cached bundle, each pull will be served through multiple bundles. The bundle will be created using "standard range" from the "stablerange" principle. The "stablerange" concept if already used for obsmarkers discovery in the evolve extensions. Using pull Bundle ================= All configuration is only required server side. The "stablerange" code currently still live in the evolve extensions, so for now enabling that extensions is required: You need at minimum the following configuration: [extensions] evolve=yes pullbundle=yes [experimental] obshashrange.warm-cache = yes If you do not want to use evolution server side, you should disable obsmarkers exchange: [experimental] evolution.exchange=no Extra Configuration =================== [pullbundle] # By default bundles are stored `.hg/cache/pullbundles/. # This can be changed with the following config: cache-directory=/absolute/path Implementation status ===================== Both for stablerange and pullbundle use "simple" initial implementations. Theses implemenations focus on testing the algorithms and proving the features works. Yet they are already useful and used in production. Performances are expected to greatly improved in the final implementation, especially if some of it end up being compiled. This first implementation lacks the ability to server the cached bundle from a CDN. We'll want this limitation to be lifted quickly. Why is does this live in the same repository as evolve ====================================================== There is no fundamental reasons for live in the same repository. However, the stablerange data-structure lives in evolve, so it was simpler to put this new extensions next to it. As soon as stable range have been upstreamed, we won't need the dependency to the evolve extension anymore. """ import errno import os from mercurial import ( changegroup, discovery, exchange, narrowspec, node as nodemod, registrar, util, ) from mercurial.i18n import _ __version__ = '0.1.0.dev' testedwith = '4.7.1' # minimumhgversion = '' buglink = 'https://bz.mercurial-scm.org/' configtable = {} configitem = registrar.configitem(configtable) configitem('pullbundle', 'cache-directory', default=None, ) # generic wrapping def uisetup(ui): exchange.getbundle2partsmapping['changegroup'] = _getbundlechangegrouppart def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None, b2caps=None, heads=None, common=None, **kwargs): """add a changegroup part to the requested bundle""" if not kwargs.get(r'cg', True): return version = '01' cgversions = b2caps.get('changegroup') if cgversions: # 3.1 and 3.2 ship with an empty value cgversions = [v for v in cgversions if v in changegroup.supportedoutgoingversions(repo)] if not cgversions: raise ValueError(_('no common changegroup version')) version = max(cgversions) outgoing = exchange._computeoutgoing(repo, heads, common) if not outgoing.missing: return if kwargs.get(r'narrow', False): include = sorted(filter(bool, kwargs.get(r'includepats', []))) exclude = sorted(filter(bool, kwargs.get(r'excludepats', []))) filematcher = narrowspec.match(repo.root, include=include, exclude=exclude) else: filematcher = None # START OF ALTERED PART makeallcgpart(bundler.newpart, repo, outgoing, version, source, bundlecaps, filematcher, cgversions) # END OF ALTERED PART if kwargs.get(r'narrow', False) and (include or exclude): narrowspecpart = bundler.newpart('narrow:spec') if include: narrowspecpart.addparam( 'include', '\n'.join(include), mandatory=True) if exclude: narrowspecpart.addparam( 'exclude', '\n'.join(exclude), mandatory=True) def makeallcgpart(newpart, repo, outgoing, version, source, bundlecaps, filematcher, cgversions): pullbundle = not filematcher if pullbundle and not util.safehasattr(repo, 'stablerange'): repo.ui.warn('pullbundle: required extension "evolve" are missing, skipping pullbundle\n') pullbundle = False if filematcher: makeonecgpart(newpart, repo, None, outgoing, version, source, bundlecaps, filematcher, cgversions) else: start = util.timer() slices = sliceoutgoing(repo, outgoing) end = util.timer() msg = _('pullbundle-cache: "missing" set sliced into %d subranges ' 'in %s seconds\n') repo.ui.write(msg % (len(slices), end - start)) for sliceid, sliceout in slices: makeonecgpart(newpart, repo, sliceid, sliceout, version, source, bundlecaps, filematcher, cgversions) # stable range slicing def sliceoutgoing(repo, outgoing): cl = repo.changelog rev = cl.nodemap.get node = cl.node revsort = repo.stablesort missingrevs = set(rev(n) for n in outgoing.missing) allslices = [] missingheads = [rev(n) for n in outgoing.missingheads] for head in missingheads: localslices = [] localmissing = set(repo.revs('%ld and ::%d', missingrevs, head)) while localmissing: slicerevs = [] for r in revsort.walkfrom(repo, head): if r not in missingrevs: break slicerevs.append(r) slicenodes = [node(r) for r in slicerevs] localslices.extend(canonicalslices(repo, slicenodes)) missingrevs.difference_update(slicerevs) localmissing.difference_update(slicerevs) if localmissing: head = max(localmissing) allslices.extend(localslices) # unknown subrange might had to be computed repo.stablerange.save(repo) return [(rangeid, outgoingfromnodes(repo, nodes)) for rangeid, nodes in allslices] def canonicalslices(repo, nodes): depth = repo.depthcache.get stablerange = repo.stablerange rangelength = lambda x: stablerange.rangelength(repo, x) headrev = repo.changelog.rev(nodes[0]) nbrevs = len(nodes) headdepth = depth(headrev) skipped = headdepth - nbrevs rangeid = (headrev, skipped) subranges = canonicalsubranges(repo, stablerange, rangeid) idx = 0 slices = [] nodes.reverse() for rangeid in subranges: size = rangelength(rangeid) slices.append((rangeid, nodes[idx:idx + size])) idx += size return slices def canonicalsubranges(repo, stablerange, rangeid): """slice a size of nodes into most reusable subranges We try to slice a range into a set of "largest" and "canonical" stable range. It might make sense to move this function as a 'stablerange' method. """ headrev, skip = rangeid rangedepth = stablerange.depthrev(repo, rangeid[0]) canonicals = [] # 0. find the first power of 2 higher than this range depth cursor = 1 while cursor <= rangedepth: cursor *= 2 # 1. find first cupt precut = cut = 0 while True: if skip <= cut: break if cut + cursor < rangedepth: precut = cut cut += cursor if cursor == 1: break cursor //= 2 # 2. optimise, bottom part if skip != cut: tailranges = [] tailsize = cut - skip assert 0 < tailsize, tailsize prerange = (headrev, precut) size = stablerange.rangelength(repo, prerange) sub = stablerange.subranges(repo, prerange)[:-1] # This power of two check is too simplistic and misbehave when too many # merge are involved. because de merge, there can be "canonical" range # with various size. while not poweroftwo(tailsize): for prerange in reversed(sub): if tailsize <= 0: break assert stablerange.depthrev(repo, prerange[0]) != prerange[1], prerange tailrev, tailskip = prerange size = stablerange.rangelength(repo, (tailrev, tailskip)) if tailsize < size: tailskip += size - tailsize size = tailsize tailranges.append((tailrev, tailskip)) tailsize -= size else: # size of the last block tailsize = stablerange.rangelength(repo, tailranges[-1]) if poweroftwo(tailsize): continue # exit the loop prerange = tailranges.pop() sub = stablerange.subranges(repo, prerange) tailranges.reverse() canonicals.extend(tailranges) # 3. take recursive subrange until we get to a power of two size? current = (headrev, cut) while not poweroftwo(stablerange.rangelength(repo, current)): sub = stablerange.subranges(repo, current) canonicals.extend(sub[:-1]) current = sub[-1] canonicals.append(current) return canonicals def poweroftwo(num): return num and not num & (num - 1) def outgoingfromnodes(repo, nodes): return discovery.outgoing(repo, missingroots=nodes, missingheads=nodes) # changegroup part construction def _changegroupinfo(repo, nodes, source): if repo.ui.verbose or source == 'bundle': repo.ui.status(_("%d changesets found\n") % len(nodes)) def _makenewstream(newpart, repo, outgoing, version, source, bundlecaps, filematcher, cgversions): old = changegroup._changegroupinfo try: changegroup._changegroupinfo = _changegroupinfo cgstream = changegroup.makestream(repo, outgoing, version, source, bundlecaps=bundlecaps, filematcher=filematcher) finally: changegroup._changegroupinfo = old nbchanges = len(outgoing.missing) pversion = None if cgversions: pversion = version return (cgstream, nbchanges, pversion) def _makepartfromstream(newpart, repo, cgstream, nbchanges, version): # same as upstream code part = newpart('changegroup', data=cgstream) if version: part.addparam('version', version) part.addparam('nbchanges', '%d' % nbchanges, mandatory=False) if 'treemanifest' in repo.requirements: part.addparam('treemanifest', '1') # cache management def cachedir(repo): cachedir = repo.ui.config('pullbundle', 'cache-directory') if cachedir is not None: return cachedir return repo.cachevfs.join('pullbundles') def getcache(repo, bundlename): cdir = cachedir(repo) bundlepath = os.path.join(cdir, bundlename) try: fd = open(bundlepath, 'rb') return util.filechunkiter(fd) except IOError as exc: if exc.errno != errno.ENOENT: raise return None def cachewriter(repo, bundlename, stream): cdir = cachedir(repo) bundlepath = os.path.join(cdir, bundlename) try: os.makedirs(cdir) except OSError as exc: if exc.errno == errno.EEXIST: pass with util.atomictempfile(bundlepath) as cachefile: for chunk in stream: cachefile.write(chunk) yield chunk BUNDLEMASK = "%s-%s-%010iskip-%010isize.hg" def makeonecgpart(newpart, repo, rangeid, outgoing, version, source, bundlecaps, filematcher, cgversions): bundlename = cachedata = None if rangeid is not None: nbchanges = repo.stablerange.rangelength(repo, rangeid) headnode = nodemod.hex(repo.changelog.node(rangeid[0])) # XXX do we need to use cgversion in there? bundlename = BUNDLEMASK % (version, headnode, rangeid[1], nbchanges) cachedata = getcache(repo, bundlename) if cachedata is None: partdata = _makenewstream(newpart, repo, outgoing, version, source, bundlecaps, filematcher, cgversions) if bundlename is not None: cgstream = cachewriter(repo, bundlename, partdata[0]) partdata = (cgstream,) + partdata[1:] else: if repo.ui.verbose or source == 'bundle': repo.ui.status(_("%d changesets found in caches\n") % nbchanges) pversion = None if cgversions: pversion = version partdata = (cachedata, nbchanges, pversion) return _makepartfromstream(newpart, repo, *partdata)