changeset 40137:ed4ebbb98ca0

wireprotov2: raise exception in objects() if future has been resolved Differential Revision: https://phab.mercurial-scm.org/D4926
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 08 Oct 2018 15:19:32 -0700
parents 3a6d6c54bd81
children b5bf3dd6ec5b
files mercurial/wireprotov2peer.py
diffstat 1 files changed, 22 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/wireprotov2peer.py	Fri Oct 05 23:49:18 2018 +0000
+++ b/mercurial/wireprotov2peer.py	Mon Oct 08 15:19:32 2018 -0700
@@ -135,6 +135,7 @@
         self._serviceable = threading.Event()
 
         self._pendingevents = []
+        self._pendingerror = None
         self._decoder = cborutil.bufferingdecoder()
         self._seeninitial = False
         self._redirect = None
@@ -169,6 +170,12 @@
 
             self._serviceable.set()
 
+    def _onerror(self, e):
+        self._pendingerror = e
+
+        with self._lock:
+            self._serviceable.set()
+
     def _handleinitial(self, o):
         self._seeninitial = True
         if o[b'status'] == b'ok':
@@ -212,6 +219,9 @@
             # our state.
             self._serviceable.wait(1.0)
 
+            if self._pendingerror:
+                raise self._pendingerror
+
             with self._lock:
                 self._serviceable.clear()
 
@@ -342,9 +352,18 @@
             try:
                 self._processresponsedata(frame, meta, response)
             except BaseException as e:
-                self._futures[frame.requestid].set_exception(e)
-                del self._futures[frame.requestid]
-                response._oninputcomplete()
+                # If an exception occurs before the future is resolved,
+                # fail the future. Otherwise, we stuff the exception on
+                # the response object so it can be raised during objects()
+                # iteration. If nothing is consuming objects(), we could
+                # silently swallow this exception. That's a risk we'll have to
+                # take.
+                if frame.requestid in self._futures:
+                    self._futures[frame.requestid].set_exception(e)
+                    del self._futures[frame.requestid]
+                    response._oninputcomplete()
+                else:
+                    response._onerror(e)
         else:
             raise error.ProgrammingError(
                 'unhandled action from clientreactor: %s' % action)