Mercurial > evolve
changeset 2044:d31ad31e456b
exchange: move code related to exchange into a 'evolve.exchange' submodule
The evolve extension is HUGE, we split exchange code appart before doing more work on it.
author | Pierre-Yves David <pierre-yves.david@ens-lyon.org> |
---|---|
date | Sat, 04 Mar 2017 02:56:50 +0100 |
parents | c64300906a32 |
children | db617700d318 |
files | hgext3rd/evolve/__init__.py hgext3rd/evolve/exchange.py hgext3rd/evolve/exthelper.py |
diffstat | 3 files changed, 416 insertions(+), 390 deletions(-) [+] |
line wrap: on
line diff
--- a/hgext3rd/evolve/__init__.py Sat Mar 04 03:37:32 2017 +0100 +++ b/hgext3rd/evolve/__init__.py Sat Mar 04 02:56:50 2017 +0100 @@ -62,7 +62,6 @@ import random import re import collections -import socket import errno import struct @@ -96,12 +95,9 @@ context, copies, error, - exchange, extensions, help, hg, - httppeer, - localrepo, lock as lockmod, merge, node, @@ -110,7 +106,6 @@ revset, scmutil, templatekw, - wireproto ) from mercurial.commands import walkopts, commitopts, commitopts2, mergetoolopts @@ -119,6 +114,7 @@ from . import ( exthelper, + exchange, serveronly, ) @@ -148,6 +144,7 @@ eh = exthelper.exthelper() +eh.merge(exchange.eh) uisetup = eh.final_uisetup extsetup = eh.final_extsetup reposetup = eh.final_reposetup @@ -3075,389 +3072,6 @@ entry[1].append(('O', 'old-obsolete', False, _("make graft obsoletes its source (DEPRECATED)"))) -##################################################################### -### Obsolescence marker exchange experimenation ### -##################################################################### - -def obsexcprg(ui, *args, **kwargs): - topic = 'obsmarkers exchange' - if ui.configbool('experimental', 'verbose-obsolescence-exchange', False): - topic = 'OBSEXC' - ui.progress(topic, *args, **kwargs) - -@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers') -def _pushdiscoveryobsmarkers(orig, pushop): - if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt) - and pushop.repo.obsstore - and 'obsolete' in pushop.remote.listkeys('namespaces')): - repo = pushop.repo - obsexcmsg(repo.ui, "computing relevant nodes\n") - revs = list(repo.revs('::%ln', pushop.futureheads)) - unfi = repo.unfiltered() - cl = unfi.changelog - if not pushop.remote.capable('_evoext_obshash_0'): - # do not trust core yet - # return orig(pushop) - nodes = [cl.node(r) for r in revs] - if nodes: - obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n" - % len(nodes)) - pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes) - else: - obsexcmsg(repo.ui, "markers already in sync\n") - pushop.outobsmarkers = [] - pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes) - return - - common = [] - obsexcmsg(repo.ui, "looking for common markers in %i nodes\n" - % len(revs)) - commonrevs = list(unfi.revs('::%ln', pushop.outgoing.commonheads)) - common = findcommonobsmarkers(pushop.ui, unfi, pushop.remote, - commonrevs) - - revs = list(unfi.revs('%ld - (::%ln)', revs, common)) - nodes = [cl.node(r) for r in revs] - if nodes: - obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n" - % len(nodes)) - pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes) - else: - obsexcmsg(repo.ui, "markers already in sync\n") - pushop.outobsmarkers = [] - -@eh.extsetup -def _installobsmarkersdiscovery(ui): - olddisco = exchange.pushdiscoverymapping['obsmarker'] - - def newdisco(pushop): - _pushdiscoveryobsmarkers(olddisco, pushop) - exchange.pushdiscoverymapping['obsmarker'] = newdisco - -### Set discovery START - -from mercurial import dagutil -from mercurial import setdiscovery - -@eh.addattr(localrepo.localpeer, 'evoext_obshash') -def local_obshash(peer, nodes): - return serveronly._obshash(peer._repo, nodes) - -@eh.addattr(localrepo.localpeer, 'evoext_obshash1') -def local_obshash1(peer, nodes): - return serveronly._obshash(peer._repo, nodes, version=1) - -@eh.addattr(wireproto.wirepeer, 'evoext_obshash') -def peer_obshash(self, nodes): - d = self._call("evoext_obshash", nodes=wireproto.encodelist(nodes)) - try: - return wireproto.decodelist(d) - except ValueError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - -@eh.addattr(wireproto.wirepeer, 'evoext_obshash1') -def peer_obshash1(self, nodes): - d = self._call("evoext_obshash1", nodes=wireproto.encodelist(nodes)) - try: - return wireproto.decodelist(d) - except ValueError: - self._abort(error.ResponseError(_("unexpected response:"), d)) - -def findcommonobsmarkers(ui, local, remote, probeset, - initialsamplesize=100, - fullsamplesize=200): - # from discovery - roundtrips = 0 - cl = local.changelog - dag = dagutil.revlogdag(cl) - missing = set() - common = set() - undecided = set(probeset) - totalnb = len(undecided) - ui.progress(_("comparing with other"), 0, total=totalnb) - _takefullsample = setdiscovery._takefullsample - if remote.capable('_evoext_obshash_1'): - getremotehash = remote.evoext_obshash1 - localhash = serveronly._obsrelsethashtreefm1(local) - else: - getremotehash = remote.evoext_obshash - localhash = serveronly._obsrelsethashtreefm0(local) - - while undecided: - - ui.note(_("sampling from both directions\n")) - if len(undecided) < fullsamplesize: - sample = set(undecided) - else: - sample = _takefullsample(dag, undecided, size=fullsamplesize) - - roundtrips += 1 - ui.progress(_("comparing with other"), totalnb - len(undecided), - total=totalnb) - ui.debug("query %i; still undecided: %i, sample size is: %i\n" - % (roundtrips, len(undecided), len(sample))) - # indices between sample and externalized version must match - sample = list(sample) - remotehash = getremotehash(dag.externalizeall(sample)) - - yesno = [localhash[ix][1] == remotehash[si] - for si, ix in enumerate(sample)] - - commoninsample = set(n for i, n in enumerate(sample) if yesno[i]) - common.update(dag.ancestorset(commoninsample, common)) - - missinginsample = [n for i, n in enumerate(sample) if not yesno[i]] - missing.update(dag.descendantset(missinginsample, missing)) - - undecided.difference_update(missing) - undecided.difference_update(common) - - ui.progress(_("comparing with other"), None) - result = dag.headsetofconnecteds(common) - ui.debug("%d total queries\n" % roundtrips) - - if not result: - return set([nullid]) - return dag.externalizeall(result) - - -_pushkeyescape = getattr(obsolete, '_pushkeyescape', None) - -class pushobsmarkerStringIO(StringIO): - """hacky string io for progress""" - - @util.propertycache - def length(self): - return len(self.getvalue()) - - def read(self, size=None): - obsexcprg(self.ui, self.tell(), unit=_("bytes"), total=self.length) - return StringIO.read(self, size) - - def __iter__(self): - d = self.read(4096) - while d: - yield d - d = self.read(4096) - -@eh.wrapfunction(exchange, '_pushobsolete') -def _pushobsolete(orig, pushop): - """utility function to push obsolete markers to a remote""" - stepsdone = getattr(pushop, 'stepsdone', None) - if stepsdone is not None: - if 'obsmarkers' in stepsdone: - return - stepsdone.add('obsmarkers') - if pushop.cgresult == 0: - return - pushop.ui.debug('try to push obsolete markers to remote\n') - repo = pushop.repo - remote = pushop.remote - if (obsolete.isenabled(repo, obsolete.exchangeopt) and repo.obsstore and - 'obsolete' in remote.listkeys('namespaces')): - markers = pushop.outobsmarkers - if not markers: - obsexcmsg(repo.ui, "no marker to push\n") - elif remote.capable('_evoext_pushobsmarkers_0'): - obsdata = pushobsmarkerStringIO() - for chunk in obsolete.encodemarkers(markers, True): - obsdata.write(chunk) - obsdata.seek(0) - obsdata.ui = repo.ui - obsexcmsg(repo.ui, "pushing %i obsolescence markers (%i bytes)\n" - % (len(markers), len(obsdata.getvalue())), - True) - remote.evoext_pushobsmarkers_0(obsdata) - obsexcprg(repo.ui, None) - else: - rslts = [] - remotedata = _pushkeyescape(markers).items() - totalbytes = sum(len(d) for k, d in remotedata) - sentbytes = 0 - obsexcmsg(repo.ui, "pushing %i obsolescence markers in %i " - "pushkey payload (%i bytes)\n" - % (len(markers), len(remotedata), totalbytes), - True) - for key, data in remotedata: - obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"), - total=totalbytes) - rslts.append(remote.pushkey('obsolete', key, '', data)) - sentbytes += len(data) - obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"), - total=totalbytes) - obsexcprg(repo.ui, None) - if [r for r in rslts if not r]: - msg = _('failed to push some obsolete markers!\n') - repo.ui.warn(msg) - obsexcmsg(repo.ui, "DONE\n") - - -@eh.addattr(wireproto.wirepeer, 'evoext_pushobsmarkers_0') -def client_pushobsmarkers(self, obsfile): - """wireprotocol peer method""" - self.requirecap('_evoext_pushobsmarkers_0', - _('push obsolete markers faster')) - ret, output = self._callpush('evoext_pushobsmarkers_0', obsfile) - for l in output.splitlines(True): - self.ui.status(_('remote: '), l) - return ret - -@eh.addattr(httppeer.httppeer, 'evoext_pushobsmarkers_0') -def httpclient_pushobsmarkers(self, obsfile): - """httpprotocol peer method - (Cannot simply use _callpush as http is doing some special handling)""" - self.requirecap('_evoext_pushobsmarkers_0', - _('push obsolete markers faster')) - try: - r = self._call('evoext_pushobsmarkers_0', data=obsfile) - vals = r.split('\n', 1) - if len(vals) < 2: - raise error.ResponseError(_("unexpected response:"), r) - - for l in vals[1].splitlines(True): - if l.strip(): - self.ui.status(_('remote: '), l) - return vals[0] - except socket.error as err: - if err.args[0] in (errno.ECONNRESET, errno.EPIPE): - raise error.Abort(_('push failed: %s') % err.args[1]) - raise error.Abort(err.args[1]) - -@eh.wrapfunction(localrepo.localrepository, '_restrictcapabilities') -def local_pushobsmarker_capabilities(orig, repo, caps): - caps = orig(repo, caps) - caps.add('_evoext_pushobsmarkers_0') - return caps - -@eh.addattr(localrepo.localpeer, 'evoext_pushobsmarkers_0') -def local_pushobsmarkers(peer, obsfile): - data = obsfile.read() - serveronly._pushobsmarkers(peer._repo, data) - -def _buildpullobsmarkersboundaries(pullop): - """small funtion returning the argument for pull markers call - may to contains 'heads' and 'common'. skip the key for None. - - Its a separed functio to play around with strategy for that.""" - repo = pullop.repo - remote = pullop.remote - unfi = repo.unfiltered() - revs = unfi.revs('::(%ln - null)', pullop.common) - common = [nullid] - if remote.capable('_evoext_obshash_0'): - obsexcmsg(repo.ui, "looking for common markers in %i nodes\n" - % len(revs)) - common = findcommonobsmarkers(repo.ui, repo, remote, revs) - return {'heads': pullop.pulledsubset, 'common': common} - -@eh.uisetup -def addgetbundleargs(self): - wireproto.gboptsmap['evo_obscommon'] = 'nodes' - -@eh.wrapfunction(exchange, '_pullbundle2extraprepare') -def _addobscommontob2pull(orig, pullop, kwargs): - ret = orig(pullop, kwargs) - if ('obsmarkers' in kwargs and - pullop.remote.capable('_evoext_getbundle_obscommon')): - boundaries = _buildpullobsmarkersboundaries(pullop) - common = boundaries['common'] - if common != [nullid]: - kwargs['evo_obscommon'] = common - return ret - -@eh.wrapfunction(exchange, '_pullobsolete') -def _pullobsolete(orig, pullop): - if not obsolete.isenabled(pullop.repo, obsolete.exchangeopt): - return None - if 'obsmarkers' not in getattr(pullop, 'todosteps', ['obsmarkers']): - return None - if 'obsmarkers' in getattr(pullop, 'stepsdone', []): - return None - wirepull = pullop.remote.capable('_evoext_pullobsmarkers_0') - if not wirepull: - return orig(pullop) - if 'obsolete' not in pullop.remote.listkeys('namespaces'): - return None # remote opted out of obsolescence marker exchange - tr = None - ui = pullop.repo.ui - boundaries = _buildpullobsmarkersboundaries(pullop) - if not set(boundaries['heads']) - set(boundaries['common']): - obsexcmsg(ui, "nothing to pull\n") - return None - - obsexcmsg(ui, "pull obsolescence markers\n", True) - new = 0 - - if wirepull: - obsdata = pullop.remote.evoext_pullobsmarkers_0(**boundaries) - obsdata = obsdata.read() - if len(obsdata) > 5: - msg = "merging obsolescence markers (%i bytes)\n" % len(obsdata) - obsexcmsg(ui, msg) - tr = pullop.gettransaction() - old = len(pullop.repo.obsstore._all) - pullop.repo.obsstore.mergemarkers(tr, obsdata) - new = len(pullop.repo.obsstore._all) - old - obsexcmsg(ui, "%i obsolescence markers added\n" % new, True) - else: - obsexcmsg(ui, "no unknown remote markers\n") - obsexcmsg(ui, "DONE\n") - if new: - pullop.repo.invalidatevolatilesets() - return tr - -@eh.addattr(wireproto.wirepeer, 'evoext_pullobsmarkers_0') -def client_pullobsmarkers(self, heads=None, common=None): - self.requirecap('_evoext_pullobsmarkers_0', _('look up remote obsmarkers')) - opts = {} - if heads is not None: - opts['heads'] = wireproto.encodelist(heads) - if common is not None: - opts['common'] = wireproto.encodelist(common) - f = self._callcompressable("evoext_pullobsmarkers_0", **opts) - length = int(f.read(20)) - chunk = 4096 - current = 0 - data = StringIO() - ui = self.ui - obsexcprg(ui, current, unit=_("bytes"), total=length) - while current < length: - readsize = min(length - current, chunk) - data.write(f.read(readsize)) - current += readsize - obsexcprg(ui, current, unit=_("bytes"), total=length) - obsexcprg(ui, None) - data.seek(0) - return data - -@eh.addattr(localrepo.localpeer, 'evoext_pullobsmarkers_0') -def local_pullobsmarkers(self, heads=None, common=None): - return serveronly._getobsmarkersstream(self._repo, heads=heads, - common=common) - -@eh.command( - 'debugobsrelsethashtree', - [('', 'v0', None, 'hash on marker format "0"'), - ('', 'v1', None, 'hash on marker format "1" (default)')], _('')) -def debugobsrelsethashtree(ui, repo, v0=False, v1=False): - """display Obsolete markers, Relevant Set, Hash Tree - changeset-node obsrelsethashtree-node - - It computed form the "orsht" of its parent and markers - relevant to the changeset itself.""" - if v0 and v1: - raise error.Abort('cannot only specify one format') - elif v0: - treefunc = serveronly._obsrelsethashtreefm0 - else: - treefunc = serveronly._obsrelsethashtreefm1 - - for chg, obs in treefunc(repo): - ui.status('%s %s\n' % (node.hex(chg), node.hex(obs))) - -_bestformat = max(obsolete.formats.keys()) - - @eh.wrapfunction(obsolete, '_checkinvalidmarkers') def _checkinvalidmarkers(orig, markers): """search for marker with invalid data and raise error if needed @@ -3475,7 +3089,7 @@ @eh.command( 'debugobsconvert', - [('', 'new-format', _bestformat, _('Destination format for markers.'))], + [('', 'new-format', exchange._bestformat, _('Destination format for markers.'))], '') def debugobsconvert(ui, repo, new_format): origmarkers = repo.obsstore._all # settle version
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hgext3rd/evolve/exchange.py Sat Mar 04 02:56:50 2017 +0100 @@ -0,0 +1,413 @@ +##################################################################### +### Obsolescence marker exchange experimenation ### +##################################################################### + +from __future__ import absolute_import + +try: + import StringIO as io + StringIO = io.StringIO +except ImportError: + import io + StringIO = io.StringIO + +import errno +import socket + +from mercurial import ( + error, + exchange, + httppeer, + localrepo, + node, + obsolete, + util, + wireproto, +) +from mercurial.i18n import _ + +from . import ( + exthelper, + serveronly, +) + +eh = exthelper.exthelper() +obsexcmsg = serveronly.obsexcmsg + +def obsexcprg(ui, *args, **kwargs): + topic = 'obsmarkers exchange' + if ui.configbool('experimental', 'verbose-obsolescence-exchange', False): + topic = 'OBSEXC' + ui.progress(topic, *args, **kwargs) + +@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers') +def _pushdiscoveryobsmarkers(orig, pushop): + if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt) + and pushop.repo.obsstore + and 'obsolete' in pushop.remote.listkeys('namespaces')): + repo = pushop.repo + obsexcmsg(repo.ui, "computing relevant nodes\n") + revs = list(repo.revs('::%ln', pushop.futureheads)) + unfi = repo.unfiltered() + cl = unfi.changelog + if not pushop.remote.capable('_evoext_obshash_0'): + # do not trust core yet + # return orig(pushop) + nodes = [cl.node(r) for r in revs] + if nodes: + obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n" + % len(nodes)) + pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes) + else: + obsexcmsg(repo.ui, "markers already in sync\n") + pushop.outobsmarkers = [] + pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes) + return + + common = [] + obsexcmsg(repo.ui, "looking for common markers in %i nodes\n" + % len(revs)) + commonrevs = list(unfi.revs('::%ln', pushop.outgoing.commonheads)) + common = findcommonobsmarkers(pushop.ui, unfi, pushop.remote, + commonrevs) + + revs = list(unfi.revs('%ld - (::%ln)', revs, common)) + nodes = [cl.node(r) for r in revs] + if nodes: + obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n" + % len(nodes)) + pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes) + else: + obsexcmsg(repo.ui, "markers already in sync\n") + pushop.outobsmarkers = [] + +@eh.extsetup +def _installobsmarkersdiscovery(ui): + olddisco = exchange.pushdiscoverymapping['obsmarker'] + + def newdisco(pushop): + _pushdiscoveryobsmarkers(olddisco, pushop) + exchange.pushdiscoverymapping['obsmarker'] = newdisco + +### Set discovery START + +from mercurial import dagutil +from mercurial import setdiscovery + +@eh.addattr(localrepo.localpeer, 'evoext_obshash') +def local_obshash(peer, nodes): + return serveronly._obshash(peer._repo, nodes) + +@eh.addattr(localrepo.localpeer, 'evoext_obshash1') +def local_obshash1(peer, nodes): + return serveronly._obshash(peer._repo, nodes, version=1) + +@eh.addattr(wireproto.wirepeer, 'evoext_obshash') +def peer_obshash(self, nodes): + d = self._call("evoext_obshash", nodes=wireproto.encodelist(nodes)) + try: + return wireproto.decodelist(d) + except ValueError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + +@eh.addattr(wireproto.wirepeer, 'evoext_obshash1') +def peer_obshash1(self, nodes): + d = self._call("evoext_obshash1", nodes=wireproto.encodelist(nodes)) + try: + return wireproto.decodelist(d) + except ValueError: + self._abort(error.ResponseError(_("unexpected response:"), d)) + +def findcommonobsmarkers(ui, local, remote, probeset, + initialsamplesize=100, + fullsamplesize=200): + # from discovery + roundtrips = 0 + cl = local.changelog + dag = dagutil.revlogdag(cl) + missing = set() + common = set() + undecided = set(probeset) + totalnb = len(undecided) + ui.progress(_("comparing with other"), 0, total=totalnb) + _takefullsample = setdiscovery._takefullsample + if remote.capable('_evoext_obshash_1'): + getremotehash = remote.evoext_obshash1 + localhash = serveronly._obsrelsethashtreefm1(local) + else: + getremotehash = remote.evoext_obshash + localhash = serveronly._obsrelsethashtreefm0(local) + + while undecided: + + ui.note(_("sampling from both directions\n")) + if len(undecided) < fullsamplesize: + sample = set(undecided) + else: + sample = _takefullsample(dag, undecided, size=fullsamplesize) + + roundtrips += 1 + ui.progress(_("comparing with other"), totalnb - len(undecided), + total=totalnb) + ui.debug("query %i; still undecided: %i, sample size is: %i\n" + % (roundtrips, len(undecided), len(sample))) + # indices between sample and externalized version must match + sample = list(sample) + remotehash = getremotehash(dag.externalizeall(sample)) + + yesno = [localhash[ix][1] == remotehash[si] + for si, ix in enumerate(sample)] + + commoninsample = set(n for i, n in enumerate(sample) if yesno[i]) + common.update(dag.ancestorset(commoninsample, common)) + + missinginsample = [n for i, n in enumerate(sample) if not yesno[i]] + missing.update(dag.descendantset(missinginsample, missing)) + + undecided.difference_update(missing) + undecided.difference_update(common) + + ui.progress(_("comparing with other"), None) + result = dag.headsetofconnecteds(common) + ui.debug("%d total queries\n" % roundtrips) + + if not result: + return set([node.nullid]) + return dag.externalizeall(result) + + +_pushkeyescape = getattr(obsolete, '_pushkeyescape', None) + +class pushobsmarkerStringIO(StringIO): + """hacky string io for progress""" + + @util.propertycache + def length(self): + return len(self.getvalue()) + + def read(self, size=None): + obsexcprg(self.ui, self.tell(), unit=_("bytes"), total=self.length) + return StringIO.read(self, size) + + def __iter__(self): + d = self.read(4096) + while d: + yield d + d = self.read(4096) + +@eh.wrapfunction(exchange, '_pushobsolete') +def _pushobsolete(orig, pushop): + """utility function to push obsolete markers to a remote""" + stepsdone = getattr(pushop, 'stepsdone', None) + if stepsdone is not None: + if 'obsmarkers' in stepsdone: + return + stepsdone.add('obsmarkers') + if pushop.cgresult == 0: + return + pushop.ui.debug('try to push obsolete markers to remote\n') + repo = pushop.repo + remote = pushop.remote + if (obsolete.isenabled(repo, obsolete.exchangeopt) and repo.obsstore and + 'obsolete' in remote.listkeys('namespaces')): + markers = pushop.outobsmarkers + if not markers: + obsexcmsg(repo.ui, "no marker to push\n") + elif remote.capable('_evoext_pushobsmarkers_0'): + obsdata = pushobsmarkerStringIO() + for chunk in obsolete.encodemarkers(markers, True): + obsdata.write(chunk) + obsdata.seek(0) + obsdata.ui = repo.ui + obsexcmsg(repo.ui, "pushing %i obsolescence markers (%i bytes)\n" + % (len(markers), len(obsdata.getvalue())), + True) + remote.evoext_pushobsmarkers_0(obsdata) + obsexcprg(repo.ui, None) + else: + rslts = [] + remotedata = _pushkeyescape(markers).items() + totalbytes = sum(len(d) for k, d in remotedata) + sentbytes = 0 + obsexcmsg(repo.ui, "pushing %i obsolescence markers in %i " + "pushkey payload (%i bytes)\n" + % (len(markers), len(remotedata), totalbytes), + True) + for key, data in remotedata: + obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"), + total=totalbytes) + rslts.append(remote.pushkey('obsolete', key, '', data)) + sentbytes += len(data) + obsexcprg(repo.ui, sentbytes, item=key, unit=_("bytes"), + total=totalbytes) + obsexcprg(repo.ui, None) + if [r for r in rslts if not r]: + msg = _('failed to push some obsolete markers!\n') + repo.ui.warn(msg) + obsexcmsg(repo.ui, "DONE\n") + + +@eh.addattr(wireproto.wirepeer, 'evoext_pushobsmarkers_0') +def client_pushobsmarkers(self, obsfile): + """wireprotocol peer method""" + self.requirecap('_evoext_pushobsmarkers_0', + _('push obsolete markers faster')) + ret, output = self._callpush('evoext_pushobsmarkers_0', obsfile) + for l in output.splitlines(True): + self.ui.status(_('remote: '), l) + return ret + +@eh.addattr(httppeer.httppeer, 'evoext_pushobsmarkers_0') +def httpclient_pushobsmarkers(self, obsfile): + """httpprotocol peer method + (Cannot simply use _callpush as http is doing some special handling)""" + self.requirecap('_evoext_pushobsmarkers_0', + _('push obsolete markers faster')) + try: + r = self._call('evoext_pushobsmarkers_0', data=obsfile) + vals = r.split('\n', 1) + if len(vals) < 2: + raise error.ResponseError(_("unexpected response:"), r) + + for l in vals[1].splitlines(True): + if l.strip(): + self.ui.status(_('remote: '), l) + return vals[0] + except socket.error as err: + if err.args[0] in (errno.ECONNRESET, errno.EPIPE): + raise error.Abort(_('push failed: %s') % err.args[1]) + raise error.Abort(err.args[1]) + +@eh.wrapfunction(localrepo.localrepository, '_restrictcapabilities') +def local_pushobsmarker_capabilities(orig, repo, caps): + caps = orig(repo, caps) + caps.add('_evoext_pushobsmarkers_0') + return caps + +@eh.addattr(localrepo.localpeer, 'evoext_pushobsmarkers_0') +def local_pushobsmarkers(peer, obsfile): + data = obsfile.read() + serveronly._pushobsmarkers(peer._repo, data) + +def _buildpullobsmarkersboundaries(pullop): + """small funtion returning the argument for pull markers call + may to contains 'heads' and 'common'. skip the key for None. + + Its a separed functio to play around with strategy for that.""" + repo = pullop.repo + remote = pullop.remote + unfi = repo.unfiltered() + revs = unfi.revs('::(%ln - null)', pullop.common) + common = [node.nullid] + if remote.capable('_evoext_obshash_0'): + obsexcmsg(repo.ui, "looking for common markers in %i nodes\n" + % len(revs)) + common = findcommonobsmarkers(repo.ui, repo, remote, revs) + return {'heads': pullop.pulledsubset, 'common': common} + +@eh.uisetup +def addgetbundleargs(self): + wireproto.gboptsmap['evo_obscommon'] = 'nodes' + +@eh.wrapfunction(exchange, '_pullbundle2extraprepare') +def _addobscommontob2pull(orig, pullop, kwargs): + ret = orig(pullop, kwargs) + if ('obsmarkers' in kwargs and + pullop.remote.capable('_evoext_getbundle_obscommon')): + boundaries = _buildpullobsmarkersboundaries(pullop) + common = boundaries['common'] + if common != [node.nullid]: + kwargs['evo_obscommon'] = common + return ret + +@eh.wrapfunction(exchange, '_pullobsolete') +def _pullobsolete(orig, pullop): + if not obsolete.isenabled(pullop.repo, obsolete.exchangeopt): + return None + if 'obsmarkers' not in getattr(pullop, 'todosteps', ['obsmarkers']): + return None + if 'obsmarkers' in getattr(pullop, 'stepsdone', []): + return None + wirepull = pullop.remote.capable('_evoext_pullobsmarkers_0') + if not wirepull: + return orig(pullop) + if 'obsolete' not in pullop.remote.listkeys('namespaces'): + return None # remote opted out of obsolescence marker exchange + tr = None + ui = pullop.repo.ui + boundaries = _buildpullobsmarkersboundaries(pullop) + if not set(boundaries['heads']) - set(boundaries['common']): + obsexcmsg(ui, "nothing to pull\n") + return None + + obsexcmsg(ui, "pull obsolescence markers\n", True) + new = 0 + + if wirepull: + obsdata = pullop.remote.evoext_pullobsmarkers_0(**boundaries) + obsdata = obsdata.read() + if len(obsdata) > 5: + msg = "merging obsolescence markers (%i bytes)\n" % len(obsdata) + obsexcmsg(ui, msg) + tr = pullop.gettransaction() + old = len(pullop.repo.obsstore._all) + pullop.repo.obsstore.mergemarkers(tr, obsdata) + new = len(pullop.repo.obsstore._all) - old + obsexcmsg(ui, "%i obsolescence markers added\n" % new, True) + else: + obsexcmsg(ui, "no unknown remote markers\n") + obsexcmsg(ui, "DONE\n") + if new: + pullop.repo.invalidatevolatilesets() + return tr + +@eh.addattr(wireproto.wirepeer, 'evoext_pullobsmarkers_0') +def client_pullobsmarkers(self, heads=None, common=None): + self.requirecap('_evoext_pullobsmarkers_0', _('look up remote obsmarkers')) + opts = {} + if heads is not None: + opts['heads'] = wireproto.encodelist(heads) + if common is not None: + opts['common'] = wireproto.encodelist(common) + f = self._callcompressable("evoext_pullobsmarkers_0", **opts) + length = int(f.read(20)) + chunk = 4096 + current = 0 + data = StringIO() + ui = self.ui + obsexcprg(ui, current, unit=_("bytes"), total=length) + while current < length: + readsize = min(length - current, chunk) + data.write(f.read(readsize)) + current += readsize + obsexcprg(ui, current, unit=_("bytes"), total=length) + obsexcprg(ui, None) + data.seek(0) + return data + +@eh.addattr(localrepo.localpeer, 'evoext_pullobsmarkers_0') +def local_pullobsmarkers(self, heads=None, common=None): + return serveronly._getobsmarkersstream(self._repo, heads=heads, + common=common) + +@eh.command( + 'debugobsrelsethashtree', + [('', 'v0', None, 'hash on marker format "0"'), + ('', 'v1', None, 'hash on marker format "1" (default)')], _('')) +def debugobsrelsethashtree(ui, repo, v0=False, v1=False): + """display Obsolete markers, Relevant Set, Hash Tree + changeset-node obsrelsethashtree-node + + It computed form the "orsht" of its parent and markers + relevant to the changeset itself.""" + if v0 and v1: + raise error.Abort('cannot only specify one format') + elif v0: + treefunc = serveronly._obsrelsethashtreefm0 + else: + treefunc = serveronly._obsrelsethashtreefm1 + + for chg, obs in treefunc(repo): + ui.status('%s %s\n' % (node.hex(chg), node.hex(obs))) + +_bestformat = max(obsolete.formats.keys())
--- a/hgext3rd/evolve/exthelper.py Sat Mar 04 03:37:32 2017 +0100 +++ b/hgext3rd/evolve/exthelper.py Sat Mar 04 02:56:50 2017 +0100 @@ -33,7 +33,6 @@ self.command = cmdutil.command(self.cmdtable) def merge(self, other): - """merge the data collected by another exthelper into this one""" self._uicallables.extend(other._uicallables) self._extcallables.extend(other._extcallables) self._repocallables.extend(other._repocallables)