mercurial/httppeer.py
changeset 37719 a656cba08a04
parent 37717 0664be4f0c1f
child 37736 e10b695b9c41
--- a/mercurial/httppeer.py	Sat Apr 14 11:49:57 2018 -0700
+++ b/mercurial/httppeer.py	Sat Apr 14 11:50:19 2018 -0700
@@ -13,7 +13,6 @@
 import os
 import socket
 import struct
-import sys
 import tempfile
 import weakref
 
@@ -36,6 +35,7 @@
     wireprotoframing,
     wireprototypes,
     wireprotov1peer,
+    wireprotov2peer,
     wireprotov2server,
 )
 
@@ -522,6 +522,8 @@
     reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
                                              buffersends=True)
 
+    handler = wireprotov2peer.clienthandler(ui, reactor)
+
     url = '%s/%s' % (apiurl, permission)
 
     if len(requests) > 1:
@@ -529,20 +531,11 @@
     else:
         url += '/%s' % requests[0][0]
 
-    # Request ID to (request, future)
-    requestmap = {}
-
     for command, args, f in requests:
-        request, action, meta = reactor.callcommand(command, args)
-        assert action == 'noop'
-
-        requestmap[request.requestid] = (request, f)
-
-    action, meta = reactor.flushcommands()
-    assert action == 'sendframes'
+        assert not list(handler.callcommand(command, args, f))
 
     # TODO stream this.
-    body = b''.join(map(bytes, meta['framegen']))
+    body = b''.join(map(bytes, handler.flushcommands()))
 
     # TODO modify user-agent to reflect v2
     headers = {
@@ -564,7 +557,7 @@
         ui.traceback()
         raise IOError(None, e)
 
-    return reactor, requestmap, res
+    return handler, res
 
 class queuedcommandfuture(pycompat.futures.Future):
     """Wraps result() on command futures to trigger submission on call."""
@@ -684,7 +677,7 @@
             'pull': 'ro',
         }[permissions.pop()]
 
-        reactor, requests, resp = sendv2request(
+        handler, resp = sendv2request(
             self._ui, self._opener, self._requestbuilder, self._apiurl,
             permission, calls)
 
@@ -692,9 +685,7 @@
 
         self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
         self._responsef = self._responseexecutor.submit(self._handleresponse,
-                                                        reactor,
-                                                        requests,
-                                                        resp)
+                                                        handler, resp)
 
     def close(self):
         if self._closed:
@@ -723,62 +714,11 @@
 
             self._futures = None
 
-    def _handleresponse(self, reactor, requests, resp):
+    def _handleresponse(self, handler, resp):
         # Called in a thread to read the response.
 
-        results = {k: [] for k in requests}
-
-        while True:
-            frame = wireprotoframing.readframe(resp)
-            if frame is None:
-                break
-
-            self._ui.note(_('received %r\n') % frame)
-
-            # Guard against receiving a frame with a request ID that we
-            # didn't issue. This should never happen.
-            request, f = requests.get(frame.requestid, [None, None])
-
-            action, meta = reactor.onframerecv(frame)
-
-            if action == 'responsedata':
-                assert request.requestid == meta['request'].requestid
-
-                result = results[request.requestid]
-
-                if meta['cbor']:
-                    payload = util.bytesio(meta['data'])
-
-                    decoder = cbor.CBORDecoder(payload)
-                    while payload.tell() + 1 < len(meta['data']):
-                        try:
-                            result.append(decoder.decode())
-                        except Exception:
-                            pycompat.future_set_exception_info(
-                                f, sys.exc_info()[1:])
-                            continue
-                else:
-                    result.append(meta['data'])
-
-                if meta['eos']:
-                    f.set_result(result)
-                    del results[request.requestid]
-
-            elif action == 'error':
-                e = error.RepoError(meta['message'])
-
-                if f:
-                    f.set_exception(e)
-                else:
-                    raise e
-
-            else:
-                e = error.ProgrammingError('unhandled action: %s' % action)
-
-                if f:
-                    f.set_exception(e)
-                else:
-                    raise e
+        while handler.readframe(resp):
+            pass
 
 # TODO implement interface for version 2 peers
 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,