--- a/mercurial/wireprotoframing.py Fri Oct 05 09:23:06 2018 -0700
+++ b/mercurial/wireprotoframing.py Fri Oct 05 10:29:36 2018 -0700
@@ -365,75 +365,6 @@
if done:
break
-def createcommandresponseframesfrombytes(stream, requestid, data,
- maxframesize=DEFAULT_MAX_FRAME_SIZE):
- """Create a raw frame to send a bytes response from static bytes input.
-
- Returns a generator of bytearrays.
- """
- # Automatically send the overall CBOR response map.
- overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
- if len(overall) > maxframesize:
- raise error.ProgrammingError('not yet implemented')
-
- # Simple case where we can fit the full response in a single frame.
- if len(overall) + len(data) <= maxframesize:
- flags = FLAG_COMMAND_RESPONSE_EOS
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=flags,
- payload=overall + data)
- return
-
- # It's easier to send the overall CBOR map in its own frame than to track
- # offsets.
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
- payload=overall)
-
- offset = 0
- while True:
- chunk = data[offset:offset + maxframesize]
- offset += len(chunk)
- done = offset == len(data)
-
- if done:
- flags = FLAG_COMMAND_RESPONSE_EOS
- else:
- flags = FLAG_COMMAND_RESPONSE_CONTINUATION
-
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=flags,
- payload=chunk)
-
- if done:
- break
-
-def createbytesresponseframesfromgen(stream, requestid, gen,
- maxframesize=DEFAULT_MAX_FRAME_SIZE):
- """Generator of frames from a generator of byte chunks.
-
- This assumes that another frame will follow whatever this emits. i.e.
- this always emits the continuation flag and never emits the end-of-stream
- flag.
- """
- cb = util.chunkbuffer(gen)
- flags = FLAG_COMMAND_RESPONSE_CONTINUATION
-
- while True:
- chunk = cb.read(maxframesize)
- if not chunk:
- break
-
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=flags,
- payload=chunk)
-
- flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
-
def createcommandresponseokframe(stream, requestid):
overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
@@ -1020,30 +951,6 @@
return meth(frame)
- def oncommandresponseready(self, stream, requestid, data):
- """Signal that a bytes response is ready to be sent to the client.
-
- The raw bytes response is passed as an argument.
- """
- ensureserverstream(stream)
-
- def sendframes():
- for frame in createcommandresponseframesfrombytes(stream, requestid,
- data):
- yield frame
-
- self._activecommands.remove(requestid)
-
- result = sendframes()
-
- if self._deferoutput:
- self._bufferedframegens.append(result)
- return 'noop', {}
- else:
- return 'sendframes', {
- 'framegen': result,
- }
-
def oncommandresponsereadyobjects(self, stream, requestid, objs):
"""Signal that objects are ready to be sent to the client.
@@ -1053,6 +960,10 @@
"""
ensureserverstream(stream)
+ # A more robust solution would be to check for objs.{next,__next__}.
+ if isinstance(objs, list):
+ objs = iter(objs)
+
# We need to take care over exception handling. Uncaught exceptions
# when generating frames could lead to premature end of the frame
# stream and the possibility of the server or client process getting
--- a/tests/test-wireproto-serverreactor.py Fri Oct 05 09:23:06 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py Fri Oct 05 10:29:36 2018 -0700
@@ -225,19 +225,22 @@
results.append(self._sendsingleframe(
reactor, ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command'}")))
- result = reactor.oncommandresponseready(outstream, 1, b'response1')
+ result = reactor.oncommandresponsereadyobjects(
+ outstream, 1, [b'response1'])
self.assertaction(result, b'sendframes')
list(result[1][b'framegen'])
results.append(self._sendsingleframe(
reactor, ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command'}")))
- result = reactor.oncommandresponseready(outstream, 1, b'response2')
+ result = reactor.oncommandresponsereadyobjects(
+ outstream, 1, [b'response2'])
self.assertaction(result, b'sendframes')
list(result[1][b'framegen'])
results.append(self._sendsingleframe(
reactor, ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command'}")))
- result = reactor.oncommandresponseready(outstream, 1, b'response3')
+ result = reactor.oncommandresponsereadyobjects(
+ outstream, 1, [b'response3'])
self.assertaction(result, b'sendframes')
list(result[1][b'framegen'])
@@ -364,10 +367,13 @@
list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
outstream = reactor.makeoutputstream()
- result = reactor.oncommandresponseready(outstream, 1, b'response')
+ result = reactor.oncommandresponsereadyobjects(
+ outstream, 1, [b'response'])
self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
- b'1 2 stream-begin command-response eos %sresponse' % OK,
+ b'1 2 stream-begin command-response continuation %s' % OK,
+ b'1 2 0 command-response continuation cbor:b"response"',
+ b'1 2 0 command-response eos ',
])
def testmultiframeresponse(self):
@@ -380,12 +386,16 @@
list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
outstream = reactor.makeoutputstream()
- result = reactor.oncommandresponseready(outstream, 1, first + second)
+ result = reactor.oncommandresponsereadyobjects(
+ outstream, 1, [first + second])
self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
b'1 2 stream-begin command-response continuation %s' % OK,
+ b'1 2 0 command-response continuation Y\x80d',
b'1 2 0 command-response continuation %s' % first,
- b'1 2 0 command-response eos %s' % second,
+ b'1 2 0 command-response continuation %s' % second,
+ b'1 2 0 command-response continuation ',
+ b'1 2 0 command-response eos '
])
def testservererror(self):
@@ -412,12 +422,15 @@
self.assertaction(results[0], b'runcommand')
outstream = reactor.makeoutputstream()
- result = reactor.oncommandresponseready(outstream, 1, b'response')
+ result = reactor.oncommandresponsereadyobjects(
+ outstream, 1, [b'response'])
self.assertaction(result, b'noop')
result = reactor.oninputeof()
self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
- b'1 2 stream-begin command-response eos %sresponse' % OK,
+ b'1 2 stream-begin command-response continuation %s' % OK,
+ b'1 2 0 command-response continuation cbor:b"response"',
+ b'1 2 0 command-response eos ',
])
def testmultiplecommanddeferresponse(self):
@@ -427,15 +440,21 @@
list(sendcommandframes(reactor, instream, 3, b'command2', {}))
outstream = reactor.makeoutputstream()
- result = reactor.oncommandresponseready(outstream, 1, b'response1')
+ result = reactor.oncommandresponsereadyobjects(
+ outstream, 1, [b'response1'])
self.assertaction(result, b'noop')
- result = reactor.oncommandresponseready(outstream, 3, b'response2')
+ result = reactor.oncommandresponsereadyobjects(
+ outstream, 3, [b'response2'])
self.assertaction(result, b'noop')
result = reactor.oninputeof()
self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
- b'1 2 stream-begin command-response eos %sresponse1' % OK,
- b'3 2 0 command-response eos %sresponse2' % OK,
+ b'1 2 stream-begin command-response continuation %s' % OK,
+ b'1 2 0 command-response continuation cbor:b"response1"',
+ b'1 2 0 command-response eos ',
+ b'3 2 0 command-response continuation %s' % OK,
+ b'3 2 0 command-response continuation cbor:b"response2"',
+ b'3 2 0 command-response eos ',
])
def testrequestidtracking(self):
@@ -447,16 +466,22 @@
# Register results for commands out of order.
outstream = reactor.makeoutputstream()
- reactor.oncommandresponseready(outstream, 3, b'response3')
- reactor.oncommandresponseready(outstream, 1, b'response1')
- reactor.oncommandresponseready(outstream, 5, b'response5')
+ reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3'])
+ reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1'])
+ reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5'])
result = reactor.oninputeof()
self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
- b'3 2 stream-begin command-response eos %sresponse3' % OK,
- b'1 2 0 command-response eos %sresponse1' % OK,
- b'5 2 0 command-response eos %sresponse5' % OK,
+ b'3 2 stream-begin command-response continuation %s' % OK,
+ b'3 2 0 command-response continuation cbor:b"response3"',
+ b'3 2 0 command-response eos ',
+ b'1 2 0 command-response continuation %s' % OK,
+ b'1 2 0 command-response continuation cbor:b"response1"',
+ b'1 2 0 command-response eos ',
+ b'5 2 0 command-response continuation %s' % OK,
+ b'5 2 0 command-response continuation cbor:b"response5"',
+ b'5 2 0 command-response eos ',
])
def testduplicaterequestonactivecommand(self):
@@ -477,7 +502,7 @@
instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'command1', {}))
outstream = reactor.makeoutputstream()
- reactor.oncommandresponseready(outstream, 1, b'response')
+ reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
# We've registered the response but haven't sent it. From the
# perspective of the reactor, the command is still active.
@@ -494,7 +519,7 @@
instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'command1', {}))
outstream = reactor.makeoutputstream()
- res = reactor.oncommandresponseready(outstream, 1, b'response')
+ res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
list(res[1][b'framegen'])
results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))