changeset 11592:26e0782b8380

protocol: unify client unbundle support - introduce _callpush helper - factor out differences in result handling into helpers - unify
author Matt Mackall <mpm@selenic.com>
date Wed, 14 Jul 2010 17:12:18 -0500
parents 0d9cb3f3b0a1
children d054cc5c7737
files mercurial/httprepo.py mercurial/sshrepo.py mercurial/wireproto.py
diffstat 3 files changed, 46 insertions(+), 55 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/httprepo.py	Wed Jul 14 17:09:31 2010 -0500
+++ b/mercurial/httprepo.py	Wed Jul 14 17:12:18 2010 -0500
@@ -138,19 +138,7 @@
             # if using keepalive, allow connection to be reused
             fp.close()
 
-    def _abort(self, exception):
-        raise exception
-
-    def _decompress(self, stream):
-        return util.chunkbuffer(zgenerator(stream))
-
-    def unbundle(self, cg, heads, source):
-        '''Send cg (a readable file-like object representing the
-        changegroup to push, typically a chunkbuffer object) to the
-        remote server as a bundle. Return an integer response code:
-        non-zero indicates a successful push (see
-        localrepository.addchangegroup()), and zero indicates either
-        error or nothing to push.'''
+    def _callpush(self, cmd, cg, **args):
         # have to stream bundle to a temp file because we do not have
         # http 1.1 chunked transfer.
 
@@ -170,21 +158,12 @@
 
         tempname = changegroup.writebundle(cg, None, type)
         fp = url.httpsendfile(tempname, "rb")
+        headers = {'Content-Type': 'application/mercurial-0.1'}
+
         try:
             try:
-                resp = self._call(
-                     'unbundle', data=fp,
-                     headers={'Content-Type': 'application/mercurial-0.1'},
-                     heads=' '.join(map(hex, heads)))
-                resp_code, output = resp.split('\n', 1)
-                try:
-                    ret = int(resp_code)
-                except ValueError, err:
-                    raise error.ResponseError(
-                            _('push failed (unexpected response):'), resp)
-                for l in output.splitlines(True):
-                    self.ui.status(_('remote: '), l)
-                return ret
+                r = self._call(cmd, data=fp, headers=headers, **args)
+                return r.split('\n', 1)
             except socket.error, err:
                 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
                     raise util.Abort(_('push failed: %s') % err.args[1])
@@ -193,6 +172,12 @@
             fp.close()
             os.unlink(tempname)
 
+    def _abort(self, exception):
+        raise exception
+
+    def _decompress(self, stream):
+        return util.chunkbuffer(zgenerator(stream))
+
 class httpsrepository(httprepository):
     def __init__(self, ui, path):
         if not url.has_https:
--- a/mercurial/sshrepo.py	Wed Jul 14 17:09:31 2010 -0500
+++ b/mercurial/sshrepo.py	Wed Jul 14 17:12:18 2010 -0500
@@ -128,6 +128,21 @@
         self._callstream(cmd, **args)
         return self._recv()
 
+    def _callpush(self, cmd, fp, **args):
+        r = self._call(cmd, **args)
+        if r:
+            return '', r
+        while 1:
+            d = fp.read(4096)
+            if not d:
+                break
+            self._send(d)
+        self._send("", flush=True)
+        r = self._recv()
+        if r:
+            return '', r
+        return self._recv(), ''
+
     def _decompress(self, stream):
         return stream
 
@@ -155,35 +170,6 @@
     def unlock(self):
         self._call("unlock")
 
-    def unbundle(self, cg, heads, source):
-        '''Send cg (a readable file-like object representing the
-        changegroup to push, typically a chunkbuffer object) to the
-        remote server as a bundle. Return an integer indicating the
-        result of the push (see localrepository.addchangegroup()).'''
-        d = self._call("unbundle", heads=' '.join(map(hex, heads)))
-        if d:
-            # remote may send "unsynced changes"
-            self._abort(error.RepoError(_("push refused: %s") % d))
-
-        while 1:
-            d = cg.read(4096)
-            if not d:
-                break
-            self._send(d)
-
-        self._send("", flush=True)
-
-        r = self._recv()
-        if r:
-            # remote may send "unsynced changes"
-            self._abort(error.RepoError(_("push failed: %s") % r))
-
-        r = self._recv()
-        try:
-            return int(r)
-        except:
-            self._abort(error.ResponseError(_("unexpected response:"), r))
-
     def addchangegroup(self, cg, source, url):
         '''Send a changegroup to the remote server.  Return an integer
         similar to unbundle(). DEPRECATED, since it requires locking the
--- a/mercurial/wireproto.py	Wed Jul 14 17:09:31 2010 -0500
+++ b/mercurial/wireproto.py	Wed Jul 14 17:12:18 2010 -0500
@@ -103,6 +103,26 @@
         return self._decompress(self._callstream("changegroupsubset",
                                                  bases=bases, heads=heads))
 
+    def unbundle(self, cg, heads, source):
+        '''Send cg (a readable file-like object representing the
+        changegroup to push, typically a chunkbuffer object) to the
+        remote server as a bundle. Return an integer indicating the
+        result of the push (see localrepository.addchangegroup()).'''
+
+        ret, output = self._callpush("unbundle", cg, heads=' '.join(map(hex, heads)))
+        if ret == "":
+            raise error.ResponseError(
+                _('push failed:'), output)
+        try:
+            ret = int(ret)
+        except ValueError, err:
+            raise error.ResponseError(
+                _('push failed (unexpected response):'), ret)
+
+        for l in output.splitlines(True):
+            self.ui.status(_('remote: '), l)
+        return ret
+
 # server side
 
 def dispatch(repo, proto, command):