wireprotov2peer: stream decoded responses
Previously, wire protocol version 2 would buffer all response data.
Only once all data was received did we CBOR decode it and resolve
the future associated with the command. This was obviously not
desirable. In future commits that introduce large response payloads,
this caused significant memory bloat and slowed down client
operations due to waiting on the server.
This commit refactors the response handling code so that response
data can be streamed.
Command response objects now contain a buffered CBOR decoder. As
new data arrives, it is fed into the decoder. Decoded objects are
made available to the generator as they are decoded.
Because there is a separate thread processing incoming frames and
feeding data into the response object, there is the potential for
race conditions when mutating response objects. So a lock has been
added to guard access to critical state variables.
Because the generator emitting decoded objects needs to wait on
those objects to become available, we've added an Event for the
generator to wait on so it doesn't busy loop. This does mean
there is the potential for deadlocks. And I'm pretty sure they can
occur in some scenarios. We already have a handful of TODOs around
this. But I've added some more. Fixing this will likely require
moving the background thread receiving frames into clienthandler.
We likely would have done this anyway when implementing the client
bits for the SSH transport.
Test output changes because the initial CBOR map holding the overall
response state is now always handled internally by the response
object.
Differential Revision: https://phab.mercurial-scm.org/D4474
--- a/mercurial/debugcommands.py Wed Aug 29 16:43:17 2018 -0700
+++ b/mercurial/debugcommands.py Wed Aug 29 15:17:11 2018 -0700
@@ -3240,7 +3240,7 @@
res = e.callcommand(command, args).result()
if isinstance(res, wireprotov2peer.commandresponse):
- val = list(res.cborobjects())
+ val = res.objects()
ui.status(_('response: %s\n') %
stringutil.pprint(val, bprefix=True, indent=2))
else:
--- a/mercurial/wireprotov2peer.py Wed Aug 29 16:43:17 2018 -0700
+++ b/mercurial/wireprotov2peer.py Wed Aug 29 15:17:11 2018 -0700
@@ -7,11 +7,12 @@
from __future__ import absolute_import
+import threading
+
from .i18n import _
from . import (
encoding,
error,
- util,
wireprotoframing,
)
from .utils import (
@@ -34,20 +35,101 @@
return b''.join(chunks)
class commandresponse(object):
- """Represents the response to a command request."""
+ """Represents the response to a command request.
+
+ Instances track the state of the command and hold its results.
+
+ An external entity is required to update the state of the object when
+ events occur.
+ """
def __init__(self, requestid, command):
self.requestid = requestid
self.command = command
- self.b = util.bytesio()
+ # Whether all remote input related to this command has been
+ # received.
+ self._inputcomplete = False
+
+ # We have a lock that is acquired when important object state is
+ # mutated. This is to prevent race conditions between 1 thread
+ # sending us new data and another consuming it.
+ self._lock = threading.RLock()
+
+ # An event is set when state of the object changes. This event
+ # is waited on by the generator emitting objects.
+ self._serviceable = threading.Event()
+
+ self._pendingevents = []
+ self._decoder = cborutil.bufferingdecoder()
+ self._seeninitial = False
+
+ def _oninputcomplete(self):
+ with self._lock:
+ self._inputcomplete = True
+ self._serviceable.set()
+
+ def _onresponsedata(self, data):
+ available, readcount, wanted = self._decoder.decode(data)
+
+ if not available:
+ return
+
+ with self._lock:
+ for o in self._decoder.getavailable():
+ if not self._seeninitial:
+ self._handleinitial(o)
+ continue
+
+ self._pendingevents.append(o)
+
+ self._serviceable.set()
- def cborobjects(self):
- """Obtain decoded CBOR objects from this response."""
- self.b.seek(0)
+ def _handleinitial(self, o):
+ self._seeninitial = True
+ if o[b'status'] == 'ok':
+ return
+
+ atoms = [{'msg': o[b'error'][b'message']}]
+ if b'args' in o[b'error']:
+ atoms[0]['args'] = o[b'error'][b'args']
+
+ raise error.RepoError(formatrichmessage(atoms))
+
+ def objects(self):
+ """Obtained decoded objects from this response.
+
+ This is a generator of data structures that were decoded from the
+ command response.
+
+ Obtaining the next member of the generator may block due to waiting
+ on external data to become available.
- for v in cborutil.decodeall(self.b.getvalue()):
- yield v
+ If the server encountered an error in the middle of serving the data
+ or if another error occurred, an exception may be raised when
+ advancing the generator.
+ """
+ while True:
+ # TODO this can infinite loop if self._inputcomplete is never
+ # set. We likely want to tie the lifetime of this object/state
+ # to that of the background thread receiving frames and updating
+ # our state.
+ self._serviceable.wait(1.0)
+
+ with self._lock:
+ self._serviceable.clear()
+
+ # Make copies because objects could be mutated during
+ # iteration.
+ stop = self._inputcomplete
+ pending = list(self._pendingevents)
+ self._pendingevents[:] = []
+
+ for o in pending:
+ yield o
+
+ if stop:
+ break
class clienthandler(object):
"""Object to handle higher-level client activities.
@@ -80,6 +162,8 @@
rid = request.requestid
self._requests[rid] = request
self._futures[rid] = f
+ # TODO we need some kind of lifetime on response instances otherwise
+ # objects() may deadlock.
self._responses[rid] = commandresponse(rid, command)
return iter(())
@@ -119,8 +203,12 @@
if action == 'error':
e = error.RepoError(meta['message'])
+ if frame.requestid in self._responses:
+ self._responses[frame.requestid]._oninputcomplete()
+
if frame.requestid in self._futures:
self._futures[frame.requestid].set_exception(e)
+ del self._futures[frame.requestid]
else:
raise e
@@ -141,39 +229,32 @@
self._processresponsedata(frame, meta, response)
except BaseException as e:
self._futures[frame.requestid].set_exception(e)
+ del self._futures[frame.requestid]
+ response._oninputcomplete()
else:
raise error.ProgrammingError(
'unhandled action from clientreactor: %s' % action)
def _processresponsedata(self, frame, meta, response):
- # This buffers all data until end of stream is received. This
- # is bad for performance.
- # TODO make response data streamable
- response.b.write(meta['data'])
+ # This can raise. The caller can handle it.
+ response._onresponsedata(meta['data'])
if meta['eos']:
- # If the command has a decoder, resolve the future to the
- # decoded value. Otherwise resolve to the rich response object.
- decoder = COMMAND_DECODERS.get(response.command)
-
- # TODO consider always resolving the overall status map.
- if decoder:
- objs = response.cborobjects()
-
- overall = next(objs)
+ response._oninputcomplete()
+ del self._requests[frame.requestid]
- if overall['status'] == 'ok':
- self._futures[frame.requestid].set_result(decoder(objs))
- else:
- atoms = [{'msg': overall['error']['message']}]
- if 'args' in overall['error']:
- atoms[0]['args'] = overall['error']['args']
- e = error.RepoError(formatrichmessage(atoms))
- self._futures[frame.requestid].set_exception(e)
- else:
- self._futures[frame.requestid].set_result(response)
+ # If the command has a decoder, we wait until all input has been
+ # received before resolving the future. Otherwise we resolve the
+ # future immediately.
+ if frame.requestid not in self._futures:
+ return
- del self._requests[frame.requestid]
+ if response.command not in COMMAND_DECODERS:
+ self._futures[frame.requestid].set_result(response.objects())
+ del self._futures[frame.requestid]
+ elif response._inputcomplete:
+ decoded = COMMAND_DECODERS[response.command](response.objects())
+ self._futures[frame.requestid].set_result(decoded)
del self._futures[frame.requestid]
def decodebranchmap(objs):
--- a/tests/test-http-api-httpv2.t Wed Aug 29 16:43:17 2018 -0700
+++ b/tests/test-http-api-httpv2.t Wed Aug 29 15:17:11 2018 -0700
@@ -225,10 +225,7 @@
s> 0\r\n
s> \r\n
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- response: [
- {
- b'status': b'ok'
- },
+ response: gen[
b'customreadonly bytes response'
]
--- a/tests/test-wireproto-command-capabilities.t Wed Aug 29 16:43:17 2018 -0700
+++ b/tests/test-wireproto-command-capabilities.t Wed Aug 29 15:17:11 2018 -0700
@@ -349,10 +349,7 @@
s> 0\r\n
s> \r\n
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- response: [
- {
- b'status': b'ok'
- },
+ response: gen[
{
b'commands': {
b'branchmap': {