--- a/mercurial/httppeer.py Sat Apr 14 11:49:57 2018 -0700
+++ b/mercurial/httppeer.py Sat Apr 14 11:50:19 2018 -0700
@@ -13,7 +13,6 @@
import os
import socket
import struct
-import sys
import tempfile
import weakref
@@ -36,6 +35,7 @@
wireprotoframing,
wireprototypes,
wireprotov1peer,
+ wireprotov2peer,
wireprotov2server,
)
@@ -522,6 +522,8 @@
reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
buffersends=True)
+ handler = wireprotov2peer.clienthandler(ui, reactor)
+
url = '%s/%s' % (apiurl, permission)
if len(requests) > 1:
@@ -529,20 +531,11 @@
else:
url += '/%s' % requests[0][0]
- # Request ID to (request, future)
- requestmap = {}
-
for command, args, f in requests:
- request, action, meta = reactor.callcommand(command, args)
- assert action == 'noop'
-
- requestmap[request.requestid] = (request, f)
-
- action, meta = reactor.flushcommands()
- assert action == 'sendframes'
+ assert not list(handler.callcommand(command, args, f))
# TODO stream this.
- body = b''.join(map(bytes, meta['framegen']))
+ body = b''.join(map(bytes, handler.flushcommands()))
# TODO modify user-agent to reflect v2
headers = {
@@ -564,7 +557,7 @@
ui.traceback()
raise IOError(None, e)
- return reactor, requestmap, res
+ return handler, res
class queuedcommandfuture(pycompat.futures.Future):
"""Wraps result() on command futures to trigger submission on call."""
@@ -684,7 +677,7 @@
'pull': 'ro',
}[permissions.pop()]
- reactor, requests, resp = sendv2request(
+ handler, resp = sendv2request(
self._ui, self._opener, self._requestbuilder, self._apiurl,
permission, calls)
@@ -692,9 +685,7 @@
self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
self._responsef = self._responseexecutor.submit(self._handleresponse,
- reactor,
- requests,
- resp)
+ handler, resp)
def close(self):
if self._closed:
@@ -723,62 +714,11 @@
self._futures = None
- def _handleresponse(self, reactor, requests, resp):
+ def _handleresponse(self, handler, resp):
# Called in a thread to read the response.
- results = {k: [] for k in requests}
-
- while True:
- frame = wireprotoframing.readframe(resp)
- if frame is None:
- break
-
- self._ui.note(_('received %r\n') % frame)
-
- # Guard against receiving a frame with a request ID that we
- # didn't issue. This should never happen.
- request, f = requests.get(frame.requestid, [None, None])
-
- action, meta = reactor.onframerecv(frame)
-
- if action == 'responsedata':
- assert request.requestid == meta['request'].requestid
-
- result = results[request.requestid]
-
- if meta['cbor']:
- payload = util.bytesio(meta['data'])
-
- decoder = cbor.CBORDecoder(payload)
- while payload.tell() + 1 < len(meta['data']):
- try:
- result.append(decoder.decode())
- except Exception:
- pycompat.future_set_exception_info(
- f, sys.exc_info()[1:])
- continue
- else:
- result.append(meta['data'])
-
- if meta['eos']:
- f.set_result(result)
- del results[request.requestid]
-
- elif action == 'error':
- e = error.RepoError(meta['message'])
-
- if f:
- f.set_exception(e)
- else:
- raise e
-
- else:
- e = error.ProgrammingError('unhandled action: %s' % action)
-
- if f:
- f.set_exception(e)
- else:
- raise e
+ while handler.readframe(resp):
+ pass
# TODO implement interface for version 2 peers
@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,