comparison mercurial/wireprotoframing.py @ 37728:564a3eec6e63

wireprotov2: add support for more response types This adds types to represent error and generator responses from server commands. Differential Revision: https://phab.mercurial-scm.org/D3388
author Gregory Szorc <gregory.szorc@gmail.com>
date Sun, 15 Apr 2018 10:37:29 -0700
parents 0c184ca594bb
children 36f487a332ad
comparison
equal deleted inserted replaced
37727:5cdde6158426 37728:564a3eec6e63
383 flags=flags, 383 flags=flags,
384 payload=chunk) 384 payload=chunk)
385 385
386 if done: 386 if done:
387 break 387 break
388
389 def createbytesresponseframesfromgen(stream, requestid, gen,
390 maxframesize=DEFAULT_MAX_FRAME_SIZE):
391 overall = cbor.dumps({b'status': b'ok'}, canonical=True)
392
393 yield stream.makeframe(requestid=requestid,
394 typeid=FRAME_TYPE_COMMAND_RESPONSE,
395 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
396 payload=overall)
397
398 cb = util.chunkbuffer(gen)
399
400 flags = 0
401
402 while True:
403 chunk = cb.read(maxframesize)
404 if not chunk:
405 break
406
407 yield stream.makeframe(requestid=requestid,
408 typeid=FRAME_TYPE_COMMAND_RESPONSE,
409 flags=flags,
410 payload=chunk)
411
412 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
413
414 flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
415 flags |= FLAG_COMMAND_RESPONSE_EOS
416 yield stream.makeframe(requestid=requestid,
417 typeid=FRAME_TYPE_COMMAND_RESPONSE,
418 flags=flags,
419 payload=b'')
420
421 def createcommanderrorresponse(stream, requestid, message, args=None):
422 m = {
423 b'status': b'error',
424 b'error': {
425 b'message': message,
426 }
427 }
428
429 if args:
430 m[b'error'][b'args'] = args
431
432 overall = cbor.dumps(m, canonical=True)
433
434 yield stream.makeframe(requestid=requestid,
435 typeid=FRAME_TYPE_COMMAND_RESPONSE,
436 flags=FLAG_COMMAND_RESPONSE_EOS,
437 payload=overall)
388 438
389 def createerrorframe(stream, requestid, msg, errtype): 439 def createerrorframe(stream, requestid, msg, errtype):
390 # TODO properly handle frame size limits. 440 # TODO properly handle frame size limits.
391 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE 441 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
392 442
632 else: 682 else:
633 return 'sendframes', { 683 return 'sendframes', {
634 'framegen': result, 684 'framegen': result,
635 } 685 }
636 686
687 def oncommandresponsereadygen(self, stream, requestid, gen):
688 """Signal that a bytes response is ready, with data as a generator."""
689 ensureserverstream(stream)
690
691 def sendframes():
692 for frame in createbytesresponseframesfromgen(stream, requestid,
693 gen):
694 yield frame
695
696 self._activecommands.remove(requestid)
697
698 return self._handlesendframes(sendframes())
699
637 def oninputeof(self): 700 def oninputeof(self):
638 """Signals that end of input has been received. 701 """Signals that end of input has been received.
639 702
640 No more frames will be received. All pending activity should be 703 No more frames will be received. All pending activity should be
641 completed. 704 completed.
653 716
654 return 'sendframes', { 717 return 'sendframes', {
655 'framegen': makegen(), 718 'framegen': makegen(),
656 } 719 }
657 720
721 def _handlesendframes(self, framegen):
722 if self._deferoutput:
723 self._bufferedframegens.append(framegen)
724 return 'noop', {}
725 else:
726 return 'sendframes', {
727 'framegen': framegen,
728 }
729
658 def onservererror(self, stream, requestid, msg): 730 def onservererror(self, stream, requestid, msg):
659 ensureserverstream(stream) 731 ensureserverstream(stream)
660 732
661 return 'sendframes', { 733 def sendframes():
662 'framegen': createerrorframe(stream, requestid, msg, 734 for frame in createerrorframe(stream, requestid, msg,
663 errtype='server'), 735 errtype='server'):
664 } 736 yield frame
737
738 self._activecommands.remove(requestid)
739
740 return self._handlesendframes(sendframes())
741
742 def oncommanderror(self, stream, requestid, message, args=None):
743 """Called when a command encountered an error before sending output."""
744 ensureserverstream(stream)
745
746 def sendframes():
747 for frame in createcommanderrorresponse(stream, requestid, message,
748 args):
749 yield frame
750
751 self._activecommands.remove(requestid)
752
753 return self._handlesendframes(sendframes())
665 754
666 def makeoutputstream(self): 755 def makeoutputstream(self):
667 """Create a stream to be used for sending data to the client.""" 756 """Create a stream to be used for sending data to the client."""
668 streamid = self._nextoutgoingstreamid 757 streamid = self._nextoutgoingstreamid
669 self._nextoutgoingstreamid += 2 758 self._nextoutgoingstreamid += 2