comparison mercurial/wireprotoframing.py @ 37289:5fadc63ac99f

wireproto: explicit API to create outgoing streams It is better to create outgoing streams through the reactor so the reactor knows about what streams are active and can track them accordingly. Test output changes slightly because frames from subsequent responses no longer have the "stream begin" stream flag set because the stream is now used across all responses. Differential Revision: https://phab.mercurial-scm.org/D2947
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 26 Mar 2018 13:59:56 -0700
parents 9bfcbe4f4745
children cc5a040fe150
comparison
equal deleted inserted replaced
37288:9bfcbe4f4745 37289:5fadc63ac99f
531 send those frames. This is useful for half-duplex transports where the 531 send those frames. This is useful for half-duplex transports where the
532 sender cannot receive until all data has been transmitted. 532 sender cannot receive until all data has been transmitted.
533 """ 533 """
534 self._deferoutput = deferoutput 534 self._deferoutput = deferoutput
535 self._state = 'idle' 535 self._state = 'idle'
536 self._nextoutgoingstreamid = 2
536 self._bufferedframegens = [] 537 self._bufferedframegens = []
537 # stream id -> stream instance for all active streams from the client. 538 # stream id -> stream instance for all active streams from the client.
538 self._incomingstreams = {} 539 self._incomingstreams = {}
540 self._outgoingstreams = {}
539 # request id -> dict of commands that are actively being received. 541 # request id -> dict of commands that are actively being received.
540 self._receivingcommands = {} 542 self._receivingcommands = {}
541 # Request IDs that have been received and are actively being processed. 543 # Request IDs that have been received and are actively being processed.
542 # Once all output for a request has been sent, it is removed from this 544 # Once all output for a request has been sent, it is removed from this
543 # set. 545 # set.
635 637
636 return 'sendframes', { 638 return 'sendframes', {
637 'framegen': createerrorframe(stream, requestid, msg, 639 'framegen': createerrorframe(stream, requestid, msg,
638 application=True), 640 application=True),
639 } 641 }
642
643 def makeoutputstream(self):
644 """Create a stream to be used for sending data to the client."""
645 streamid = self._nextoutgoingstreamid
646 self._nextoutgoingstreamid += 2
647
648 s = stream(streamid)
649 self._outgoingstreams[streamid] = s
650
651 return s
640 652
641 def _makeerrorresult(self, msg): 653 def _makeerrorresult(self, msg):
642 return 'error', { 654 return 'error', {
643 'message': msg, 655 'message': msg,
644 } 656 }