Mercurial > evolve
changeset 821:202376586cf6
exchange: introduce a wireprotocol command to push markers
Pushkey is not adapted to transmit a stream of binary data. We used it as a hack
so far. We add a new wire protocol command to speed the things up (and also push
all the markers in a single transaction). The proper way to do it is to
introduce bundle2 in core, but this will already help in the mean time.
author | Pierre-Yves David <pierre-yves.david@fb.com> |
---|---|
date | Fri, 28 Feb 2014 00:40:29 -0800 |
parents | a9a66143e2ec |
children | 5f5d269278e9 |
files | hgext/evolve.py |
diffstat | 1 files changed, 75 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/hgext/evolve.py Fri Feb 28 10:30:10 2014 -0800 +++ b/hgext/evolve.py Fri Feb 28 00:40:29 2014 -0800 @@ -24,6 +24,7 @@ import sys import random +from cStringIO import StringIO import struct import mercurial @@ -2103,7 +2104,10 @@ keys['dump%i' % idx] = base85.b85encode(data) return keys - +def _encodemarkersstream(fp, markers): + fp.write(_pack('>B', 0)) + for mark in markers: + fp.write(obsolete._encodeonemarker(mark)) @eh.wrapfunction(exchange, '_pushobsolete') def _pushobsolete(orig, pushop): @@ -2119,22 +2123,74 @@ repo.ui.status("OBSEXC: computing markers relevant to %i nodes\n" % len(nodes)) markers = repo.obsstore.relevantmarkers(nodes) - rslts = [] - repo.ui.status("OBSEXC: encoding %i markers\n" % len(markers)) - remotedata = _pushkeyescape(markers).items() - totalbytes = sum(len(d) for k,d in remotedata) - sentbytes = 0 - repo.ui.status("OBSEXC: sending %i pushkey payload (%i bytes)\n" - % (len(remotedata), totalbytes)) - for key, data in remotedata: - repo.ui.progress('OBSEXC', sentbytes, item=key, unit="bytes", - total=totalbytes) - rslts.append(remote.pushkey('obsolete', key, '', data)) - sentbytes += len(data) - repo.ui.progress('OBSEXC', sentbytes, item=key, unit="bytes", - total=totalbytes) - repo.ui.progress('OBSEXC', None) - if [r for r in rslts if not r]: - msg = _('failed to push some obsolete markers!\n') - repo.ui.warn(msg) + if remote.capable('_evoext_pushobsmarkers_0'): + repo.ui.status("OBSEXC: writing %i markers\n" % len(markers)) + obsdata = StringIO() + _encodemarkersstream(obsdata, markers) + obsdata.seek(0) + repo.ui.status("OBSEXC: pushing %i bytes\n" + % len(obsdata.getvalue())) + remote.evoext_pushobsmarkers_0(obsdata) + else: + rslts = [] + repo.ui.status("OBSEXC: encoding %i markers\n" % len(markers)) + remotedata = _pushkeyescape(markers).items() + totalbytes = sum(len(d) for k,d in remotedata) + sentbytes = 0 + repo.ui.status("OBSEXC: sending %i pushkey payload (%i bytes)\n" + % (len(remotedata), totalbytes)) + for key, data in remotedata: + repo.ui.progress('OBSEXC', sentbytes, item=key, unit="bytes", + total=totalbytes) + rslts.append(remote.pushkey('obsolete', key, '', data)) + sentbytes += len(data) + repo.ui.progress('OBSEXC', sentbytes, item=key, unit="bytes", + total=totalbytes) + repo.ui.progress('OBSEXC', None) + if [r for r in rslts if not r]: + msg = _('failed to push some obsolete markers!\n') + repo.ui.warn(msg) repo.ui.status("OBSEXC: DONE\n") + + +@eh.addattr(wireproto.wirepeer, 'evoext_pushobsmarkers_0') +def client_pushobsmarkers(self, obsfile): + """wireprotocol peer method""" + self.requirecap('_evoext_pushobsmarkers_0', + _('push obsolete markers faster')) + ret, output = self._callpush('evoext_pushobsmarkers_0', obsfile) + for l in output.splitlines(True): + self.ui.status(_('remote: '), l) + return ret + + +def srv_pushobsmarkers(repo, proto): + """wireprotocol command""" + fp = StringIO() + proto.redirect() + proto.getfile(fp) + data = fp.getvalue() + fp.close() + lock = repo.lock() + try: + tr = repo.transaction('pushkey: obsolete markers') + try: + repo.obsstore.mergemarkers(tr, data) + tr.close() + finally: + tr.release() + finally: + lock.release() + return wireproto.pushres(0) + +@eh.wrapfunction(wireproto, 'capabilities') +def capabilities(orig, repo, proto): + """wrapper to advertise new capability""" + caps = orig(repo, proto) + if obsolete._enabled: + caps += ' _evoext_pushobsmarkers_0' + return caps + +@eh.extsetup +def _installwireprotocol(ui): + wireproto.commands['evoext_pushobsmarkers_0'] = (srv_pushobsmarkers, '')