--- a/mercurial/wireprotoframing.py Mon Mar 26 13:57:22 2018 -0700
+++ b/mercurial/wireprotoframing.py Mon Mar 26 11:00:16 2018 -0700
@@ -25,15 +25,26 @@
stringutil,
)
-FRAME_HEADER_SIZE = 6
+FRAME_HEADER_SIZE = 8
DEFAULT_MAX_FRAME_SIZE = 32768
+STREAM_FLAG_BEGIN_STREAM = 0x01
+STREAM_FLAG_END_STREAM = 0x02
+STREAM_FLAG_ENCODING_APPLIED = 0x04
+
+STREAM_FLAGS = {
+ b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
+ b'stream-end': STREAM_FLAG_END_STREAM,
+ b'encoded': STREAM_FLAG_ENCODING_APPLIED,
+}
+
FRAME_TYPE_COMMAND_NAME = 0x01
FRAME_TYPE_COMMAND_ARGUMENT = 0x02
FRAME_TYPE_COMMAND_DATA = 0x03
FRAME_TYPE_BYTES_RESPONSE = 0x04
FRAME_TYPE_ERROR_RESPONSE = 0x05
FRAME_TYPE_TEXT_OUTPUT = 0x06
+FRAME_TYPE_STREAM_SETTINGS = 0x08
FRAME_TYPES = {
b'command-name': FRAME_TYPE_COMMAND_NAME,
@@ -42,6 +53,7 @@
b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
b'error-response': FRAME_TYPE_ERROR_RESPONSE,
b'text-output': FRAME_TYPE_TEXT_OUTPUT,
+ b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
}
FLAG_COMMAND_NAME_EOS = 0x01
@@ -94,6 +106,7 @@
FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
FRAME_TYPE_TEXT_OUTPUT: {},
+ FRAME_TYPE_STREAM_SETTINGS: {},
}
ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
@@ -104,6 +117,8 @@
length = attr.ib()
requestid = attr.ib()
+ streamid = attr.ib()
+ streamflags = attr.ib()
typeid = attr.ib()
flags = attr.ib()
@@ -112,25 +127,29 @@
"""Represents a parsed frame."""
requestid = attr.ib()
+ streamid = attr.ib()
+ streamflags = attr.ib()
typeid = attr.ib()
flags = attr.ib()
payload = attr.ib()
-def makeframe(requestid, typeid, flags, payload):
+def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
"""Assemble a frame into a byte array."""
# TODO assert size of payload.
frame = bytearray(FRAME_HEADER_SIZE + len(payload))
# 24 bits length
# 16 bits request id
+ # 8 bits stream id
+ # 8 bits stream flags
# 4 bits type
# 4 bits flags
l = struct.pack(r'<I', len(payload))
frame[0:3] = l[0:3]
- struct.pack_into(r'<H', frame, 3, requestid)
- frame[5] = (typeid << 4) | flags
- frame[6:] = payload
+ struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
+ frame[7] = (typeid << 4) | flags
+ frame[8:] = payload
return frame
@@ -139,20 +158,30 @@
Strings have the form:
- <request-id> <type> <flags> <payload>
+ <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
This can be used by user-facing applications and tests for creating
frames easily without having to type out a bunch of constants.
- Request ID is an integer.
+ Request ID and stream IDs are integers.
- Frame type and flags can be specified by integer or named constant.
+ Stream flags, frame type, and flags can be specified by integer or
+ named constant.
Flags can be delimited by `|` to bitwise OR them together.
"""
- requestid, frametype, frameflags, payload = s.split(b' ', 3)
+ fields = s.split(b' ', 5)
+ requestid, streamid, streamflags, frametype, frameflags, payload = fields
requestid = int(requestid)
+ streamid = int(streamid)
+
+ finalstreamflags = 0
+ for flag in streamflags.split(b'|'):
+ if flag in STREAM_FLAGS:
+ finalstreamflags |= STREAM_FLAGS[flag]
+ else:
+ finalstreamflags |= int(flag)
if frametype in FRAME_TYPES:
frametype = FRAME_TYPES[frametype]
@@ -169,7 +198,8 @@
payload = stringutil.unescapestr(payload)
- return makeframe(requestid=requestid, typeid=frametype,
+ return makeframe(requestid=requestid, streamid=streamid,
+ streamflags=finalstreamflags, typeid=frametype,
flags=finalflags, payload=payload)
def parseheader(data):
@@ -179,17 +209,21 @@
buffer is expected to be large enough to hold a full header.
"""
# 24 bits payload length (little endian)
+ # 16 bits request ID
+ # 8 bits stream ID
+ # 8 bits stream flags
# 4 bits frame type
# 4 bits frame flags
# ... payload
framelength = data[0] + 256 * data[1] + 16384 * data[2]
- requestid = struct.unpack_from(r'<H', data, 3)[0]
- typeflags = data[5]
+ requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
+ typeflags = data[7]
frametype = (typeflags & 0xf0) >> 4
frameflags = typeflags & 0x0f
- return frameheader(framelength, requestid, frametype, frameflags)
+ return frameheader(framelength, requestid, streamid, streamflags,
+ frametype, frameflags)
def readframe(fh):
"""Read a unified framing protocol frame from a file object.
@@ -216,7 +250,8 @@
raise error.Abort(_('frame length error: expected %d; got %d') %
(h.length, len(payload)))
- return frame(h.requestid, h.typeid, h.flags, payload)
+ return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
+ payload)
def createcommandframes(stream, requestid, cmd, args, datafh=None):
"""Create frames necessary to transmit a request to run a command.
@@ -398,12 +433,28 @@
class stream(object):
"""Represents a logical unidirectional series of frames."""
+ def __init__(self, streamid, active=False):
+ self.streamid = streamid
+ self._active = False
+
def makeframe(self, requestid, typeid, flags, payload):
"""Create a frame to be sent out over this stream.
Only returns the frame instance. Does not actually send it.
"""
- return makeframe(requestid, typeid, flags, payload)
+ streamflags = 0
+ if not self._active:
+ streamflags |= STREAM_FLAG_BEGIN_STREAM
+ self._active = True
+
+ return makeframe(requestid, self.streamid, streamflags, typeid, flags,
+ payload)
+
+def ensureserverstream(stream):
+ if stream.streamid % 2:
+ raise error.ProgrammingError('server should only write to even '
+ 'numbered streams; %d is not even' %
+ stream.streamid)
class serverreactor(object):
"""Holds state of a server handling frame-based protocol requests.
@@ -483,6 +534,8 @@
self._deferoutput = deferoutput
self._state = 'idle'
self._bufferedframegens = []
+ # stream id -> stream instance for all active streams from the client.
+ self._incomingstreams = {}
# request id -> dict of commands that are actively being received.
self._receivingcommands = {}
# Request IDs that have been received and are actively being processed.
@@ -496,6 +549,30 @@
Returns a dict with an ``action`` key that details what action,
if any, the consumer should take next.
"""
+ if not frame.streamid % 2:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('received frame with even numbered stream ID: %d') %
+ frame.streamid)
+
+ if frame.streamid not in self._incomingstreams:
+ if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('received frame on unknown inactive stream without '
+ 'beginning of stream flag set'))
+
+ self._incomingstreams[frame.streamid] = stream(frame.streamid)
+
+ if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
+ # TODO handle decoding frames
+ self._state = 'errored'
+ raise error.ProgrammingError('support for decoding stream payloads '
+ 'not yet implemented')
+
+ if frame.streamflags & STREAM_FLAG_END_STREAM:
+ del self._incomingstreams[frame.streamid]
+
handlers = {
'idle': self._onframeidle,
'command-receiving': self._onframecommandreceiving,
@@ -513,6 +590,8 @@
The raw bytes response is passed as an argument.
"""
+ ensureserverstream(stream)
+
def sendframes():
for frame in createbytesresponseframesfrombytes(stream, requestid,
data):
@@ -552,6 +631,8 @@
}
def onapplicationerror(self, stream, requestid, msg):
+ ensureserverstream(stream)
+
return 'sendframes', {
'framegen': createerrorframe(stream, requestid, msg,
application=True),
--- a/tests/test-wireproto-serverreactor.py Mon Mar 26 13:57:22 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py Mon Mar 26 11:00:16 2018 -0700
@@ -23,6 +23,8 @@
assert len(payload) == header.length
yield reactor.onframerecv(framing.frame(header.requestid,
+ header.streamid,
+ header.streamflags,
header.typeid,
header.flags,
payload))
@@ -37,32 +39,32 @@
def testdataexactframesize(self):
data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
- stream = framing.stream()
+ stream = framing.stream(1)
frames = list(framing.createcommandframes(stream, 1, b'command',
{}, data))
self.assertEqual(frames, [
- ffs(b'1 command-name have-data command'),
- ffs(b'1 command-data continuation %s' % data.getvalue()),
- ffs(b'1 command-data eos ')
+ ffs(b'1 1 stream-begin command-name have-data command'),
+ ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
+ ffs(b'1 1 0 command-data eos ')
])
def testdatamultipleframes(self):
data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
- stream = framing.stream()
+ stream = framing.stream(1)
frames = list(framing.createcommandframes(stream, 1, b'command', {},
data))
self.assertEqual(frames, [
- ffs(b'1 command-name have-data command'),
- ffs(b'1 command-data continuation %s' % (
+ ffs(b'1 1 stream-begin command-name have-data command'),
+ ffs(b'1 1 0 command-data continuation %s' % (
b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
- ffs(b'1 command-data eos x'),
+ ffs(b'1 1 0 command-data eos x'),
])
def testargsanddata(self):
data = util.bytesio(b'x' * 100)
- stream = framing.stream()
+ stream = framing.stream(1)
frames = list(framing.createcommandframes(stream, 1, b'command', {
b'key1': b'key1value',
b'key2': b'key2value',
@@ -70,11 +72,11 @@
}, data))
self.assertEqual(frames, [
- ffs(b'1 command-name have-args|have-data command'),
- ffs(br'1 command-argument 0 \x04\x00\x09\x00key1key1value'),
- ffs(br'1 command-argument 0 \x04\x00\x09\x00key2key2value'),
- ffs(br'1 command-argument eoa \x04\x00\x09\x00key3key3value'),
- ffs(b'1 command-data eos %s' % data.getvalue()),
+ ffs(b'1 1 stream-begin command-name have-args|have-data command'),
+ ffs(br'1 1 0 command-argument 0 \x04\x00\x09\x00key1key1value'),
+ ffs(br'1 1 0 command-argument 0 \x04\x00\x09\x00key2key2value'),
+ ffs(br'1 1 0 command-argument eoa \x04\x00\x09\x00key3key3value'),
+ ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
])
def testtextoutputexcessiveargs(self):
@@ -128,64 +130,68 @@
(b'bleh', [], [b'x' * 65536])]))
def testtextoutput1simpleatom(self):
- stream = framing.stream()
+ stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo', [], [])]))
self.assertEqual(val, [
- ffs(br'1 text-output 0 \x03\x00\x00\x00foo'),
+ ffs(br'1 1 stream-begin text-output 0 \x03\x00\x00\x00foo'),
])
def testtextoutput2simpleatoms(self):
- stream = framing.stream()
+ stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo', [], []),
(b'bar', [], []),
]))
self.assertEqual(val, [
- ffs(br'1 text-output 0 \x03\x00\x00\x00foo\x03\x00\x00\x00bar'),
+ ffs(br'1 1 stream-begin text-output 0 '
+ br'\x03\x00\x00\x00foo\x03\x00\x00\x00bar'),
])
def testtextoutput1arg(self):
- stream = framing.stream()
+ stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo %s', [b'val1'], []),
]))
self.assertEqual(val, [
- ffs(br'1 text-output 0 \x06\x00\x00\x01\x04\x00foo %sval1'),
+ ffs(br'1 1 stream-begin text-output 0 '
+ br'\x06\x00\x00\x01\x04\x00foo %sval1'),
])
def testtextoutput2arg(self):
- stream = framing.stream()
+ stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo %s %s', [b'val', b'value'], []),
]))
self.assertEqual(val, [
- ffs(br'1 text-output 0 \x09\x00\x00\x02\x03\x00\x05\x00'
- br'foo %s %svalvalue'),
+ ffs(br'1 1 stream-begin text-output 0 '
+ br'\x09\x00\x00\x02\x03\x00\x05\x00foo %s %svalvalue'),
])
def testtextoutput1label(self):
- stream = framing.stream()
+ stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo', [], [b'label']),
]))
self.assertEqual(val, [
- ffs(br'1 text-output 0 \x03\x00\x01\x00\x05foolabel'),
+ ffs(br'1 1 stream-begin text-output 0 '
+ br'\x03\x00\x01\x00\x05foolabel'),
])
def testargandlabel(self):
- stream = framing.stream()
+ stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [
(b'foo %s', [b'arg'], [b'label']),
]))
self.assertEqual(val, [
- ffs(br'1 text-output 0 \x06\x00\x01\x01\x05\x03\x00foo %slabelarg'),
+ ffs(br'1 1 stream-begin text-output 0 '
+ br'\x06\x00\x01\x01\x05\x03\x00foo %slabelarg'),
])
class ServerReactorTests(unittest.TestCase):
@@ -208,7 +214,7 @@
def test1framecommand(self):
"""Receiving a command in a single frame yields request to run it."""
reactor = makereactor()
- stream = framing.stream()
+ stream = framing.stream(1)
results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
self.assertEqual(len(results), 1)
self.assertaction(results[0], 'runcommand')
@@ -224,7 +230,7 @@
def test1argument(self):
reactor = makereactor()
- stream = framing.stream()
+ stream = framing.stream(1)
results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
{b'foo': b'bar'}))
self.assertEqual(len(results), 2)
@@ -239,7 +245,7 @@
def testmultiarguments(self):
reactor = makereactor()
- stream = framing.stream()
+ stream = framing.stream(1)
results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
{b'foo': b'bar', b'biz': b'baz'}))
self.assertEqual(len(results), 3)
@@ -255,7 +261,7 @@
def testsimplecommanddata(self):
reactor = makereactor()
- stream = framing.stream()
+ stream = framing.stream(1)
results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
util.bytesio(b'data!')))
self.assertEqual(len(results), 2)
@@ -270,10 +276,10 @@
def testmultipledataframes(self):
frames = [
- ffs(b'1 command-name have-data mycommand'),
- ffs(b'1 command-data continuation data1'),
- ffs(b'1 command-data continuation data2'),
- ffs(b'1 command-data eos data3'),
+ ffs(b'1 1 stream-begin command-name have-data mycommand'),
+ ffs(b'1 1 0 command-data continuation data1'),
+ ffs(b'1 1 0 command-data continuation data2'),
+ ffs(b'1 1 0 command-data eos data3'),
]
reactor = makereactor()
@@ -291,11 +297,11 @@
def testargumentanddata(self):
frames = [
- ffs(b'1 command-name have-args|have-data command'),
- ffs(br'1 command-argument 0 \x03\x00\x03\x00keyval'),
- ffs(br'1 command-argument eoa \x03\x00\x03\x00foobar'),
- ffs(b'1 command-data continuation value1'),
- ffs(b'1 command-data eos value2'),
+ ffs(b'1 1 stream-begin command-name have-args|have-data command'),
+ ffs(br'1 1 0 command-argument 0 \x03\x00\x03\x00keyval'),
+ ffs(br'1 1 0 command-argument eoa \x03\x00\x03\x00foobar'),
+ ffs(b'1 1 0 command-data continuation value1'),
+ ffs(b'1 1 0 command-data eos value2'),
]
reactor = makereactor()
@@ -314,8 +320,8 @@
def testunexpectedcommandargument(self):
"""Command argument frame when not running a command is an error."""
- result = self._sendsingleframe(makereactor(),
- ffs(b'1 command-argument 0 ignored'))
+ result = self._sendsingleframe(
+ makereactor(), ffs(b'1 1 stream-begin command-argument 0 ignored'))
self.assertaction(result, 'error')
self.assertEqual(result[1], {
'message': b'expected command frame; got 2',
@@ -324,8 +330,8 @@
def testunexpectedcommandargumentreceiving(self):
"""Same as above but the command is receiving."""
results = list(sendframes(makereactor(), [
- ffs(b'1 command-name have-data command'),
- ffs(b'1 command-argument eoa ignored'),
+ ffs(b'1 1 stream-begin command-name have-data command'),
+ ffs(b'1 1 0 command-argument eoa ignored'),
]))
self.assertaction(results[1], 'error')
@@ -336,8 +342,8 @@
def testunexpectedcommanddata(self):
"""Command argument frame when not running a command is an error."""
- result = self._sendsingleframe(makereactor(),
- ffs(b'1 command-data 0 ignored'))
+ result = self._sendsingleframe(
+ makereactor(), ffs(b'1 1 stream-begin command-data 0 ignored'))
self.assertaction(result, 'error')
self.assertEqual(result[1], {
'message': b'expected command frame; got 3',
@@ -346,8 +352,8 @@
def testunexpectedcommanddatareceiving(self):
"""Same as above except the command is receiving."""
results = list(sendframes(makereactor(), [
- ffs(b'1 command-name have-args command'),
- ffs(b'1 command-data eos ignored'),
+ ffs(b'1 1 stream-begin command-name have-args command'),
+ ffs(b'1 1 0 command-data eos ignored'),
]))
self.assertaction(results[1], 'error')
@@ -358,8 +364,8 @@
def testmissingcommandframeflags(self):
"""Command name frame must have flags set."""
- result = self._sendsingleframe(makereactor(),
- ffs(b'1 command-name 0 command'))
+ result = self._sendsingleframe(
+ makereactor(), ffs(b'1 1 stream-begin command-name 0 command'))
self.assertaction(result, 'error')
self.assertEqual(result[1], {
'message': b'missing frame flags on command frame',
@@ -369,19 +375,19 @@
"""Multiple fully serviced commands with same request ID is allowed."""
reactor = makereactor()
results = []
- outstream = framing.stream()
+ outstream = framing.stream(2)
results.append(self._sendsingleframe(
- reactor, ffs(b'1 command-name eos command')))
+ reactor, ffs(b'1 1 stream-begin command-name eos command')))
result = reactor.onbytesresponseready(outstream, 1, b'response1')
self.assertaction(result, 'sendframes')
list(result[1]['framegen'])
results.append(self._sendsingleframe(
- reactor, ffs(b'1 command-name eos command')))
+ reactor, ffs(b'1 1 0 command-name eos command')))
result = reactor.onbytesresponseready(outstream, 1, b'response2')
self.assertaction(result, 'sendframes')
list(result[1]['framegen'])
results.append(self._sendsingleframe(
- reactor, ffs(b'1 command-name eos command')))
+ reactor, ffs(b'1 1 0 command-name eos command')))
result = reactor.onbytesresponseready(outstream, 1, b'response3')
self.assertaction(result, 'sendframes')
list(result[1]['framegen'])
@@ -398,8 +404,8 @@
def testconflictingrequestid(self):
"""Request ID for new command matching in-flight command is illegal."""
results = list(sendframes(makereactor(), [
- ffs(b'1 command-name have-args command'),
- ffs(b'1 command-name eos command'),
+ ffs(b'1 1 stream-begin command-name have-args command'),
+ ffs(b'1 1 0 command-name eos command'),
]))
self.assertaction(results[0], 'wantframe')
@@ -410,12 +416,12 @@
def testinterleavedcommands(self):
results = list(sendframes(makereactor(), [
- ffs(b'1 command-name have-args command1'),
- ffs(b'3 command-name have-args command3'),
- ffs(br'1 command-argument 0 \x03\x00\x03\x00foobar'),
- ffs(br'3 command-argument 0 \x03\x00\x03\x00bizbaz'),
- ffs(br'3 command-argument eoa \x03\x00\x03\x00keyval'),
- ffs(br'1 command-argument eoa \x04\x00\x03\x00key1val'),
+ ffs(b'1 1 stream-begin command-name have-args command1'),
+ ffs(b'3 1 0 command-name have-args command3'),
+ ffs(br'1 1 0 command-argument 0 \x03\x00\x03\x00foobar'),
+ ffs(br'3 1 0 command-argument 0 \x03\x00\x03\x00bizbaz'),
+ ffs(br'3 1 0 command-argument eoa \x03\x00\x03\x00keyval'),
+ ffs(br'1 1 0 command-argument eoa \x04\x00\x03\x00key1val'),
]))
self.assertEqual([t[0] for t in results], [
@@ -445,7 +451,7 @@
# command request waiting on argument data. But it doesn't handle that
# scenario yet. So this test does nothing of value.
frames = [
- ffs(b'1 command-name have-args command'),
+ ffs(b'1 1 stream-begin command-name have-args command'),
]
results = list(sendframes(makereactor(), frames))
@@ -454,8 +460,8 @@
def testincompleteargumentname(self):
"""Argument frame with incomplete name."""
frames = [
- ffs(b'1 command-name have-args command1'),
- ffs(br'1 command-argument eoa \x04\x00\xde\xadfoo'),
+ ffs(b'1 1 stream-begin command-name have-args command1'),
+ ffs(br'1 1 0 command-argument eoa \x04\x00\xde\xadfoo'),
]
results = list(sendframes(makereactor(), frames))
@@ -469,8 +475,8 @@
def testincompleteargumentvalue(self):
"""Argument frame with incomplete value."""
frames = [
- ffs(b'1 command-name have-args command'),
- ffs(br'1 command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
+ ffs(b'1 1 stream-begin command-name have-args command'),
+ ffs(br'1 1 0 command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
]
results = list(sendframes(makereactor(), frames))
@@ -485,8 +491,8 @@
# The reactor doesn't currently handle partially received commands.
# So this test is failing to do anything with request 1.
frames = [
- ffs(b'1 command-name have-data command1'),
- ffs(b'3 command-name eos command2'),
+ ffs(b'1 1 stream-begin command-name have-data command1'),
+ ffs(b'3 1 0 command-name eos command2'),
]
results = list(sendframes(makereactor(), frames))
self.assertEqual(len(results), 2)
@@ -495,8 +501,8 @@
def testmissingcommanddataframeflags(self):
frames = [
- ffs(b'1 command-name have-data command1'),
- ffs(b'1 command-data 0 data'),
+ ffs(b'1 1 stream-begin command-name have-data command1'),
+ ffs(b'1 1 0 command-data 0 data'),
]
results = list(sendframes(makereactor(), frames))
self.assertEqual(len(results), 2)
@@ -509,9 +515,9 @@
def testframefornonreceivingrequest(self):
"""Receiving a frame for a command that is not receiving is illegal."""
results = list(sendframes(makereactor(), [
- ffs(b'1 command-name eos command1'),
- ffs(b'3 command-name have-data command3'),
- ffs(b'5 command-argument eoa ignored'),
+ ffs(b'1 1 stream-begin command-name eos command1'),
+ ffs(b'3 1 0 command-name have-data command3'),
+ ffs(b'5 1 0 command-argument eoa ignored'),
]))
self.assertaction(results[2], 'error')
self.assertEqual(results[2][1], {
@@ -521,14 +527,14 @@
def testsimpleresponse(self):
"""Bytes response to command sends result frames."""
reactor = makereactor()
- instream = framing.stream()
+ instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
- outstream = framing.stream()
+ outstream = framing.stream(2)
result = reactor.onbytesresponseready(outstream, 1, b'response')
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
- b'1 bytes-response eos response',
+ b'1 2 stream-begin bytes-response eos response',
])
def testmultiframeresponse(self):
@@ -537,54 +543,54 @@
second = b'y' * 100
reactor = makereactor()
- instream = framing.stream()
+ instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
- outstream = framing.stream()
+ outstream = framing.stream(2)
result = reactor.onbytesresponseready(outstream, 1, first + second)
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
- b'1 bytes-response continuation %s' % first,
- b'1 bytes-response eos %s' % second,
+ b'1 2 stream-begin bytes-response continuation %s' % first,
+ b'1 2 0 bytes-response eos %s' % second,
])
def testapplicationerror(self):
reactor = makereactor()
- instream = framing.stream()
+ instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
- outstream = framing.stream()
+ outstream = framing.stream(2)
result = reactor.onapplicationerror(outstream, 1, b'some message')
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
- b'1 error-response application some message',
+ b'1 2 stream-begin error-response application some message',
])
def test1commanddeferresponse(self):
"""Responses when in deferred output mode are delayed until EOF."""
reactor = makereactor(deferoutput=True)
- instream = framing.stream()
+ instream = framing.stream(1)
results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
{}))
self.assertEqual(len(results), 1)
self.assertaction(results[0], 'runcommand')
- outstream = framing.stream()
+ outstream = framing.stream(2)
result = reactor.onbytesresponseready(outstream, 1, b'response')
self.assertaction(result, 'noop')
result = reactor.oninputeof()
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
- b'1 bytes-response eos response',
+ b'1 2 stream-begin bytes-response eos response',
])
def testmultiplecommanddeferresponse(self):
reactor = makereactor(deferoutput=True)
- instream = framing.stream()
+ instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'command1', {}))
list(sendcommandframes(reactor, instream, 3, b'command2', {}))
- outstream = framing.stream()
+ outstream = framing.stream(2)
result = reactor.onbytesresponseready(outstream, 1, b'response1')
self.assertaction(result, 'noop')
result = reactor.onbytesresponseready(outstream, 3, b'response2')
@@ -592,19 +598,19 @@
result = reactor.oninputeof()
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
- b'1 bytes-response eos response1',
- b'3 bytes-response eos response2'
+ b'1 2 stream-begin bytes-response eos response1',
+ b'3 2 0 bytes-response eos response2'
])
def testrequestidtracking(self):
reactor = makereactor(deferoutput=True)
- instream = framing.stream()
+ instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'command1', {}))
list(sendcommandframes(reactor, instream, 3, b'command2', {}))
list(sendcommandframes(reactor, instream, 5, b'command3', {}))
# Register results for commands out of order.
- outstream = framing.stream()
+ outstream = framing.stream(2)
reactor.onbytesresponseready(outstream, 3, b'response3')
reactor.onbytesresponseready(outstream, 1, b'response1')
reactor.onbytesresponseready(outstream, 5, b'response5')
@@ -612,15 +618,15 @@
result = reactor.oninputeof()
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
- b'3 bytes-response eos response3',
- b'1 bytes-response eos response1',
- b'5 bytes-response eos response5',
+ b'3 2 stream-begin bytes-response eos response3',
+ b'1 2 0 bytes-response eos response1',
+ b'5 2 0 bytes-response eos response5',
])
def testduplicaterequestonactivecommand(self):
"""Receiving a request ID that matches a request that isn't finished."""
reactor = makereactor()
- stream = framing.stream()
+ stream = framing.stream(1)
list(sendcommandframes(reactor, stream, 1, b'command1', {}))
results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
@@ -632,9 +638,9 @@
def testduplicaterequestonactivecommandnosend(self):
"""Same as above but we've registered a response but haven't sent it."""
reactor = makereactor()
- instream = framing.stream()
+ instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'command1', {}))
- outstream = framing.stream()
+ outstream = framing.stream(2)
reactor.onbytesresponseready(outstream, 1, b'response')
# We've registered the response but haven't sent it. From the
@@ -649,11 +655,11 @@
def testduplicaterequestargumentframe(self):
"""Variant on above except we sent an argument frame instead of name."""
reactor = makereactor()
- stream = framing.stream()
+ stream = framing.stream(1)
list(sendcommandframes(reactor, stream, 1, b'command', {}))
results = list(sendframes(reactor, [
- ffs(b'3 command-name have-args command'),
- ffs(b'1 command-argument 0 ignored'),
+ ffs(b'3 1 stream-begin command-name have-args command'),
+ ffs(b'1 1 0 command-argument 0 ignored'),
]))
self.assertaction(results[0], 'wantframe')
self.assertaction(results[1], 'error')
@@ -664,9 +670,9 @@
def testduplicaterequestaftersend(self):
"""We can use a duplicate request ID after we've sent the response."""
reactor = makereactor()
- instream = framing.stream()
+ instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'command1', {}))
- outstream = framing.stream()
+ outstream = framing.stream(2)
res = reactor.onbytesresponseready(outstream, 1, b'response')
list(res[1]['framegen'])