wireproto: explicitly track which requests are active
We previously only tracked which requests are receiving. A
misbehaving client could accidentally have multiple requests with
the same ID in flight.
We now explicitly track which request IDs are currently active.
We make it illegal to receive a frame associated with a request
ID that has already been dispatched.
Differential Revision: https://phab.mercurial-scm.org/D2901
--- a/mercurial/wireprotoframing.py Thu Mar 15 16:09:58 2018 -0700
+++ b/mercurial/wireprotoframing.py Thu Mar 15 18:05:49 2018 -0700
@@ -472,6 +472,10 @@
self._bufferedframegens = []
# request id -> dict of commands that are actively being received.
self._receivingcommands = {}
+ # Request IDs that have been received and are actively being processed.
+ # Once all output for a request has been sent, it is removed from this
+ # set.
+ self._activecommands = set()
def onframerecv(self, frame):
"""Process a frame that has been received off the wire.
@@ -496,14 +500,20 @@
The raw bytes response is passed as an argument.
"""
- framegen = createbytesresponseframesfrombytes(requestid, data)
+ def sendframes():
+ for frame in createbytesresponseframesfrombytes(requestid, data):
+ yield frame
+
+ self._activecommands.remove(requestid)
+
+ result = sendframes()
if self._deferoutput:
- self._bufferedframegens.append(framegen)
+ self._bufferedframegens.append(result)
return 'noop', {}
else:
return 'sendframes', {
- 'framegen': framegen,
+ 'framegen': result,
}
def oninputeof(self):
@@ -546,6 +556,9 @@
else:
self._state = 'idle'
+ assert requestid not in self._activecommands
+ self._activecommands.add(requestid)
+
return 'runcommand', {
'requestid': requestid,
'command': entry['command'],
@@ -571,6 +584,11 @@
return self._makeerrorresult(
_('request with ID %d already received') % frame.requestid)
+ if frame.requestid in self._activecommands:
+ self._state = 'errored'
+ return self._makeerrorresult((
+ _('request with ID %d is already active') % frame.requestid))
+
expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
@@ -599,7 +617,13 @@
return self._onframeidle(frame)
# All other frames should be related to a command that is currently
- # receiving.
+ # receiving but is not active.
+ if frame.requestid in self._activecommands:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('received frame for request that is still active: %d') %
+ frame.requestid)
+
if frame.requestid not in self._receivingcommands:
self._state = 'errored'
return self._makeerrorresult(
--- a/tests/test-wireproto-serverreactor.py Thu Mar 15 16:09:58 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py Thu Mar 15 18:05:49 2018 -0700
@@ -478,11 +478,11 @@
results = list(sendframes(makereactor(), [
ffs(b'1 command-name eos command1'),
ffs(b'3 command-name have-data command3'),
- ffs(b'1 command-argument eoa ignored'),
+ ffs(b'5 command-argument eoa ignored'),
]))
self.assertaction(results[2], 'error')
self.assertEqual(results[2][1], {
- 'message': b'received frame for request that is not receiving: 1',
+ 'message': b'received frame for request that is not receiving: 5',
})
def testsimpleresponse(self):
@@ -571,6 +571,56 @@
b'5 bytes-response eos response5',
])
+ def testduplicaterequestonactivecommand(self):
+ """Receiving a request ID that matches a request that isn't finished."""
+ reactor = makereactor()
+ list(sendcommandframes(reactor, 1, b'command1', {}))
+ results = list(sendcommandframes(reactor, 1, b'command1', {}))
+
+ self.assertaction(results[0], 'error')
+ self.assertEqual(results[0][1], {
+ 'message': b'request with ID 1 is already active',
+ })
+
+ def testduplicaterequestonactivecommandnosend(self):
+ """Same as above but we've registered a response but haven't sent it."""
+ reactor = makereactor()
+ list(sendcommandframes(reactor, 1, b'command1', {}))
+ reactor.onbytesresponseready(1, b'response')
+
+ # We've registered the response but haven't sent it. From the
+ # perspective of the reactor, the command is still active.
+
+ results = list(sendcommandframes(reactor, 1, b'command1', {}))
+ self.assertaction(results[0], 'error')
+ self.assertEqual(results[0][1], {
+ 'message': b'request with ID 1 is already active',
+ })
+
+ def testduplicaterequestargumentframe(self):
+ """Variant on above except we sent an argument frame instead of name."""
+ reactor = makereactor()
+ list(sendcommandframes(reactor, 1, b'command', {}))
+ results = list(sendframes(reactor, [
+ ffs(b'3 command-name have-args command'),
+ ffs(b'1 command-argument 0 ignored'),
+ ]))
+ self.assertaction(results[0], 'wantframe')
+ self.assertaction(results[1], 'error')
+ self.assertEqual(results[1][1], {
+ 'message': 'received frame for request that is still active: 1',
+ })
+
+ def testduplicaterequestaftersend(self):
+ """We can use a duplicate request ID after we've sent the response."""
+ reactor = makereactor()
+ list(sendcommandframes(reactor, 1, b'command1', {}))
+ res = reactor.onbytesresponseready(1, b'response')
+ list(res[1]['framegen'])
+
+ results = list(sendcommandframes(reactor, 1, b'command1', {}))
+ self.assertaction(results[0], 'runcommand')
+
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)