diff hgext/evolve.py @ 821:202376586cf6

exchange: introduce a wireprotocol command to push markers Pushkey is not adapted to transmit a stream of binary data. We used it as a hack so far. We add a new wire protocol command to speed the things up (and also push all the markers in a single transaction). The proper way to do it is to introduce bundle2 in core, but this will already help in the mean time.
author Pierre-Yves David <pierre-yves.david@fb.com>
date Fri, 28 Feb 2014 00:40:29 -0800
parents a9a66143e2ec
children 9aa20585e158
line wrap: on
line diff
--- a/hgext/evolve.py	Fri Feb 28 10:30:10 2014 -0800
+++ b/hgext/evolve.py	Fri Feb 28 00:40:29 2014 -0800
@@ -24,6 +24,7 @@
 
 import sys
 import random
+from cStringIO import StringIO
 import struct
 
 import mercurial
@@ -2103,7 +2104,10 @@
             keys['dump%i' % idx] = base85.b85encode(data)
         return keys
 
-
+def _encodemarkersstream(fp, markers):
+    fp.write(_pack('>B', 0))
+    for mark in markers:
+        fp.write(obsolete._encodeonemarker(mark))
 
 @eh.wrapfunction(exchange, '_pushobsolete')
 def _pushobsolete(orig, pushop):
@@ -2119,22 +2123,74 @@
         repo.ui.status("OBSEXC: computing markers relevant to %i nodes\n"
                        % len(nodes))
         markers = repo.obsstore.relevantmarkers(nodes)
-        rslts = []
-        repo.ui.status("OBSEXC: encoding %i markers\n" % len(markers))
-        remotedata = _pushkeyescape(markers).items()
-        totalbytes = sum(len(d) for k,d in remotedata)
-        sentbytes = 0
-        repo.ui.status("OBSEXC: sending %i pushkey payload (%i bytes)\n"
-                        % (len(remotedata), totalbytes))
-        for key, data in remotedata:
-            repo.ui.progress('OBSEXC', sentbytes, item=key, unit="bytes",
-                             total=totalbytes)
-            rslts.append(remote.pushkey('obsolete', key, '', data))
-            sentbytes += len(data)
-            repo.ui.progress('OBSEXC', sentbytes, item=key, unit="bytes",
-                             total=totalbytes)
-        repo.ui.progress('OBSEXC', None)
-        if [r for r in rslts if not r]:
-            msg = _('failed to push some obsolete markers!\n')
-            repo.ui.warn(msg)
+        if remote.capable('_evoext_pushobsmarkers_0'):
+            repo.ui.status("OBSEXC: writing %i markers\n" % len(markers))
+            obsdata = StringIO()
+            _encodemarkersstream(obsdata, markers)
+            obsdata.seek(0)
+            repo.ui.status("OBSEXC: pushing %i bytes\n"
+                           % len(obsdata.getvalue()))
+            remote.evoext_pushobsmarkers_0(obsdata)
+        else:
+            rslts = []
+            repo.ui.status("OBSEXC: encoding %i markers\n" % len(markers))
+            remotedata = _pushkeyescape(markers).items()
+            totalbytes = sum(len(d) for k,d in remotedata)
+            sentbytes = 0
+            repo.ui.status("OBSEXC: sending %i pushkey payload (%i bytes)\n"
+                            % (len(remotedata), totalbytes))
+            for key, data in remotedata:
+                repo.ui.progress('OBSEXC', sentbytes, item=key, unit="bytes",
+                                 total=totalbytes)
+                rslts.append(remote.pushkey('obsolete', key, '', data))
+                sentbytes += len(data)
+                repo.ui.progress('OBSEXC', sentbytes, item=key, unit="bytes",
+                                 total=totalbytes)
+            repo.ui.progress('OBSEXC', None)
+            if [r for r in rslts if not r]:
+                msg = _('failed to push some obsolete markers!\n')
+                repo.ui.warn(msg)
         repo.ui.status("OBSEXC: DONE\n")
+
+
+@eh.addattr(wireproto.wirepeer, 'evoext_pushobsmarkers_0')
+def client_pushobsmarkers(self, obsfile):
+    """wireprotocol peer method"""
+    self.requirecap('_evoext_pushobsmarkers_0',
+                    _('push obsolete markers faster'))
+    ret, output = self._callpush('evoext_pushobsmarkers_0', obsfile)
+    for l in output.splitlines(True):
+        self.ui.status(_('remote: '), l)
+    return ret
+
+
+def srv_pushobsmarkers(repo, proto):
+    """wireprotocol command"""
+    fp = StringIO()
+    proto.redirect()
+    proto.getfile(fp)
+    data = fp.getvalue()
+    fp.close()
+    lock = repo.lock()
+    try:
+        tr = repo.transaction('pushkey: obsolete markers')
+        try:
+            repo.obsstore.mergemarkers(tr, data)
+            tr.close()
+        finally:
+            tr.release()
+    finally:
+        lock.release()
+    return wireproto.pushres(0)
+
+@eh.wrapfunction(wireproto, 'capabilities')
+def capabilities(orig, repo, proto):
+    """wrapper to advertise new capability"""
+    caps = orig(repo, proto)
+    if obsolete._enabled:
+        caps += ' _evoext_pushobsmarkers_0'
+    return caps
+
+@eh.extsetup
+def _installwireprotocol(ui):
+    wireproto.commands['evoext_pushobsmarkers_0'] = (srv_pushobsmarkers, '')