--- a/mercurial/wireprotoframing.py Mon Aug 27 13:30:44 2018 -0700
+++ b/mercurial/wireprotoframing.py Wed Sep 05 09:06:40 2018 -0700
@@ -388,16 +388,14 @@
def createbytesresponseframesfromgen(stream, requestid, gen,
maxframesize=DEFAULT_MAX_FRAME_SIZE):
- overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
+ """Generator of frames from a generator of byte chunks.
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
- payload=overall)
-
+ This assumes that another frame will follow whatever this emits. i.e.
+ this always emits the continuation flag and never emits the end-of-stream
+ flag.
+ """
cb = util.chunkbuffer(gen)
-
- flags = 0
+ flags = FLAG_COMMAND_RESPONSE_CONTINUATION
while True:
chunk = cb.read(maxframesize)
@@ -411,12 +409,20 @@
flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
- flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
- flags |= FLAG_COMMAND_RESPONSE_EOS
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=flags,
- payload=b'')
+def createcommandresponseokframe(stream, requestid):
+ overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
+
+ return stream.makeframe(requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+ payload=overall)
+
+def createcommandresponseeosframe(stream, requestid):
+ """Create an empty payload frame representing command end-of-stream."""
+ return stream.makeframe(requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_EOS,
+ payload=b'')
def createcommanderrorresponse(stream, requestid, message, args=None):
# TODO should this be using a list of {'msg': ..., 'args': {}} so atom
@@ -686,14 +692,69 @@
'framegen': result,
}
- def oncommandresponsereadygen(self, stream, requestid, gen):
- """Signal that a bytes response is ready, with data as a generator."""
+ def oncommandresponsereadyobjects(self, stream, requestid, objs):
+ """Signal that objects are ready to be sent to the client.
+
+ ``objs`` is an iterable of objects (typically a generator) that will
+ be encoded via CBOR and added to frames, which will be sent to the
+ client.
+ """
ensureserverstream(stream)
+ # We need to take care over exception handling. Uncaught exceptions
+ # when generating frames could lead to premature end of the frame
+ # stream and the possibility of the server or client process getting
+ # in a bad state.
+ #
+ # Keep in mind that if ``objs`` is a generator, advancing it could
+ # raise exceptions that originated in e.g. wire protocol command
+ # functions. That is why we differentiate between exceptions raised
+ # when iterating versus other exceptions that occur.
+ #
+ # In all cases, when the function finishes, the request is fully
+ # handled and no new frames for it should be seen.
+
def sendframes():
- for frame in createbytesresponseframesfromgen(stream, requestid,
- gen):
- yield frame
+ emitted = False
+ while True:
+ try:
+ o = next(objs)
+ except StopIteration:
+ if emitted:
+ yield createcommandresponseeosframe(stream, requestid)
+ break
+
+ except error.WireprotoCommandError as e:
+ for frame in createcommanderrorresponse(
+ stream, requestid, e.message, e.messageargs):
+ yield frame
+ break
+
+ except Exception as e:
+ for frame in createerrorframe(stream, requestid,
+ '%s' % e,
+ errtype='server'):
+ yield frame
+
+ break
+
+ try:
+ if not emitted:
+ yield createcommandresponseokframe(stream, requestid)
+ emitted = True
+
+ # TODO buffer chunks so emitted frame payloads can be
+ # larger.
+ for frame in createbytesresponseframesfromgen(
+ stream, requestid, cborutil.streamencode(o)):
+ yield frame
+ except Exception as e:
+ for frame in createerrorframe(stream, requestid,
+ '%s' % e,
+ errtype='server'):
+ yield frame
+
+ break
self._activecommands.remove(requestid)