wireprotov2: add support for more response types
This adds types to represent error and generator responses from
server commands.
Differential Revision: https://phab.mercurial-scm.org/D3388
--- a/mercurial/wireprotoframing.py Sat Apr 14 15:38:11 2018 -0700
+++ b/mercurial/wireprotoframing.py Sun Apr 15 10:37:29 2018 -0700
@@ -386,6 +386,56 @@
if done:
break
+def createbytesresponseframesfromgen(stream, requestid, gen,
+ maxframesize=DEFAULT_MAX_FRAME_SIZE):
+ overall = cbor.dumps({b'status': b'ok'}, canonical=True)
+
+ yield stream.makeframe(requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+ payload=overall)
+
+ cb = util.chunkbuffer(gen)
+
+ flags = 0
+
+ while True:
+ chunk = cb.read(maxframesize)
+ if not chunk:
+ break
+
+ yield stream.makeframe(requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=flags,
+ payload=chunk)
+
+ flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
+
+ flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
+ flags |= FLAG_COMMAND_RESPONSE_EOS
+ yield stream.makeframe(requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=flags,
+ payload=b'')
+
+def createcommanderrorresponse(stream, requestid, message, args=None):
+ m = {
+ b'status': b'error',
+ b'error': {
+ b'message': message,
+ }
+ }
+
+ if args:
+ m[b'error'][b'args'] = args
+
+ overall = cbor.dumps(m, canonical=True)
+
+ yield stream.makeframe(requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_EOS,
+ payload=overall)
+
def createerrorframe(stream, requestid, msg, errtype):
# TODO properly handle frame size limits.
assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
@@ -634,6 +684,19 @@
'framegen': result,
}
+ def oncommandresponsereadygen(self, stream, requestid, gen):
+ """Signal that a bytes response is ready, with data as a generator."""
+ ensureserverstream(stream)
+
+ def sendframes():
+ for frame in createbytesresponseframesfromgen(stream, requestid,
+ gen):
+ yield frame
+
+ self._activecommands.remove(requestid)
+
+ return self._handlesendframes(sendframes())
+
def oninputeof(self):
"""Signals that end of input has been received.
@@ -655,13 +718,39 @@
'framegen': makegen(),
}
+ def _handlesendframes(self, framegen):
+ if self._deferoutput:
+ self._bufferedframegens.append(framegen)
+ return 'noop', {}
+ else:
+ return 'sendframes', {
+ 'framegen': framegen,
+ }
+
def onservererror(self, stream, requestid, msg):
ensureserverstream(stream)
- return 'sendframes', {
- 'framegen': createerrorframe(stream, requestid, msg,
- errtype='server'),
- }
+ def sendframes():
+ for frame in createerrorframe(stream, requestid, msg,
+ errtype='server'):
+ yield frame
+
+ self._activecommands.remove(requestid)
+
+ return self._handlesendframes(sendframes())
+
+ def oncommanderror(self, stream, requestid, message, args=None):
+ """Called when a command encountered an error before sending output."""
+ ensureserverstream(stream)
+
+ def sendframes():
+ for frame in createcommanderrorresponse(stream, requestid, message,
+ args):
+ yield frame
+
+ self._activecommands.remove(requestid)
+
+ return self._handlesendframes(sendframes())
def makeoutputstream(self):
"""Create a stream to be used for sending data to the client."""
--- a/mercurial/wireprototypes.py Sat Apr 14 15:38:11 2018 -0700
+++ b/mercurial/wireprototypes.py Sun Apr 15 10:37:29 2018 -0700
@@ -106,6 +106,22 @@
def __init__(self, v):
self.value = v
+class v2errorresponse(object):
+ """Represents a command error for version 2 transports."""
+ def __init__(self, message, args=None):
+ self.message = message
+ self.args = args
+
+class v2streamingresponse(object):
+ """A response whose data is supplied by a generator.
+
+ The generator can either consist of data structures to CBOR
+ encode or a stream of already-encoded bytes.
+ """
+ def __init__(self, gen, compressible=True):
+ self.gen = gen
+ self.compressible = compressible
+
# list of nodes encoding / decoding
def decodelist(l, sep=' '):
if l:
--- a/mercurial/wireprotov2server.py Sat Apr 14 15:38:11 2018 -0700
+++ b/mercurial/wireprotov2server.py Sun Apr 15 10:37:29 2018 -0700
@@ -306,6 +306,15 @@
action, meta = reactor.oncommandresponseready(outstream,
command['requestid'],
encoded)
+ elif isinstance(rsp, wireprototypes.v2streamingresponse):
+ action, meta = reactor.oncommandresponsereadygen(outstream,
+ command['requestid'],
+ rsp.gen)
+ elif isinstance(rsp, wireprototypes.v2errorresponse):
+ action, meta = reactor.oncommanderror(outstream,
+ command['requestid'],
+ rsp.message,
+ rsp.args)
else:
action, meta = reactor.onservererror(
_('unhandled response type from wire proto command'))