wireprotov2: raise exception in objects() if future has been resolved
Differential Revision: https://phab.mercurial-scm.org/D4926
--- a/mercurial/wireprotov2peer.py Fri Oct 05 23:49:18 2018 +0000
+++ b/mercurial/wireprotov2peer.py Mon Oct 08 15:19:32 2018 -0700
@@ -135,6 +135,7 @@
self._serviceable = threading.Event()
self._pendingevents = []
+ self._pendingerror = None
self._decoder = cborutil.bufferingdecoder()
self._seeninitial = False
self._redirect = None
@@ -169,6 +170,12 @@
self._serviceable.set()
+ def _onerror(self, e):
+ self._pendingerror = e
+
+ with self._lock:
+ self._serviceable.set()
+
def _handleinitial(self, o):
self._seeninitial = True
if o[b'status'] == b'ok':
@@ -212,6 +219,9 @@
# our state.
self._serviceable.wait(1.0)
+ if self._pendingerror:
+ raise self._pendingerror
+
with self._lock:
self._serviceable.clear()
@@ -342,9 +352,18 @@
try:
self._processresponsedata(frame, meta, response)
except BaseException as e:
- self._futures[frame.requestid].set_exception(e)
- del self._futures[frame.requestid]
- response._oninputcomplete()
+ # If an exception occurs before the future is resolved,
+ # fail the future. Otherwise, we stuff the exception on
+ # the response object so it can be raised during objects()
+ # iteration. If nothing is consuming objects(), we could
+ # silently swallow this exception. That's a risk we'll have to
+ # take.
+ if frame.requestid in self._futures:
+ self._futures[frame.requestid].set_exception(e)
+ del self._futures[frame.requestid]
+ response._oninputcomplete()
+ else:
+ response._onerror(e)
else:
raise error.ProgrammingError(
'unhandled action from clientreactor: %s' % action)