changeset 37063:39304dd63589

wireproto: explicitly track which requests are active We previously only tracked which requests are receiving. A misbehaving client could accidentally have multiple requests with the same ID in flight. We now explicitly track which request IDs are currently active. We make it illegal to receive a frame associated with a request ID that has already been dispatched. Differential Revision: https://phab.mercurial-scm.org/D2901
author Gregory Szorc <gregory.szorc@gmail.com>
date Thu, 15 Mar 2018 18:05:49 -0700
parents fe4c944f95bb
children 434e520adb8c
files mercurial/wireprotoframing.py tests/test-wireproto-serverreactor.py
diffstat 2 files changed, 80 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/wireprotoframing.py	Thu Mar 15 16:09:58 2018 -0700
+++ b/mercurial/wireprotoframing.py	Thu Mar 15 18:05:49 2018 -0700
@@ -472,6 +472,10 @@
         self._bufferedframegens = []
         # request id -> dict of commands that are actively being received.
         self._receivingcommands = {}
+        # Request IDs that have been received and are actively being processed.
+        # Once all output for a request has been sent, it is removed from this
+        # set.
+        self._activecommands = set()
 
     def onframerecv(self, frame):
         """Process a frame that has been received off the wire.
@@ -496,14 +500,20 @@
 
         The raw bytes response is passed as an argument.
         """
-        framegen = createbytesresponseframesfrombytes(requestid, data)
+        def sendframes():
+            for frame in createbytesresponseframesfrombytes(requestid, data):
+                yield frame
+
+            self._activecommands.remove(requestid)
+
+        result = sendframes()
 
         if self._deferoutput:
-            self._bufferedframegens.append(framegen)
+            self._bufferedframegens.append(result)
             return 'noop', {}
         else:
             return 'sendframes', {
-                'framegen': framegen,
+                'framegen': result,
             }
 
     def oninputeof(self):
@@ -546,6 +556,9 @@
         else:
             self._state = 'idle'
 
+        assert requestid not in self._activecommands
+        self._activecommands.add(requestid)
+
         return 'runcommand', {
             'requestid': requestid,
             'command': entry['command'],
@@ -571,6 +584,11 @@
             return self._makeerrorresult(
                 _('request with ID %d already received') % frame.requestid)
 
+        if frame.requestid in self._activecommands:
+            self._state = 'errored'
+            return self._makeerrorresult((
+                _('request with ID %d is already active') % frame.requestid))
+
         expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
         expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
 
@@ -599,7 +617,13 @@
             return self._onframeidle(frame)
 
         # All other frames should be related to a command that is currently
-        # receiving.
+        # receiving but is not active.
+        if frame.requestid in self._activecommands:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('received frame for request that is still active: %d') %
+                frame.requestid)
+
         if frame.requestid not in self._receivingcommands:
             self._state = 'errored'
             return self._makeerrorresult(
--- a/tests/test-wireproto-serverreactor.py	Thu Mar 15 16:09:58 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py	Thu Mar 15 18:05:49 2018 -0700
@@ -478,11 +478,11 @@
         results = list(sendframes(makereactor(), [
             ffs(b'1 command-name eos command1'),
             ffs(b'3 command-name have-data command3'),
-            ffs(b'1 command-argument eoa ignored'),
+            ffs(b'5 command-argument eoa ignored'),
         ]))
         self.assertaction(results[2], 'error')
         self.assertEqual(results[2][1], {
-            'message': b'received frame for request that is not receiving: 1',
+            'message': b'received frame for request that is not receiving: 5',
         })
 
     def testsimpleresponse(self):
@@ -571,6 +571,56 @@
             b'5 bytes-response eos response5',
         ])
 
+    def testduplicaterequestonactivecommand(self):
+        """Receiving a request ID that matches a request that isn't finished."""
+        reactor = makereactor()
+        list(sendcommandframes(reactor, 1, b'command1', {}))
+        results = list(sendcommandframes(reactor, 1, b'command1', {}))
+
+        self.assertaction(results[0], 'error')
+        self.assertEqual(results[0][1], {
+            'message': b'request with ID 1 is already active',
+        })
+
+    def testduplicaterequestonactivecommandnosend(self):
+        """Same as above but we've registered a response but haven't sent it."""
+        reactor = makereactor()
+        list(sendcommandframes(reactor, 1, b'command1', {}))
+        reactor.onbytesresponseready(1, b'response')
+
+        # We've registered the response but haven't sent it. From the
+        # perspective of the reactor, the command is still active.
+
+        results = list(sendcommandframes(reactor, 1, b'command1', {}))
+        self.assertaction(results[0], 'error')
+        self.assertEqual(results[0][1], {
+            'message': b'request with ID 1 is already active',
+        })
+
+    def testduplicaterequestargumentframe(self):
+        """Variant on above except we sent an argument frame instead of name."""
+        reactor = makereactor()
+        list(sendcommandframes(reactor, 1, b'command', {}))
+        results = list(sendframes(reactor, [
+            ffs(b'3 command-name have-args command'),
+            ffs(b'1 command-argument 0 ignored'),
+        ]))
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': 'received frame for request that is still active: 1',
+        })
+
+    def testduplicaterequestaftersend(self):
+        """We can use a duplicate request ID after we've sent the response."""
+        reactor = makereactor()
+        list(sendcommandframes(reactor, 1, b'command1', {}))
+        res = reactor.onbytesresponseready(1, b'response')
+        list(res[1]['framegen'])
+
+        results = list(sendcommandframes(reactor, 1, b'command1', {}))
+        self.assertaction(results[0], 'runcommand')
+
 if __name__ == '__main__':
     import silenttestrunner
     silenttestrunner.main(__name__)