Mercurial > hg
view mercurial/wireprotov2peer.py @ 37720:d715a85003c8
wireprotov2: establish a type for representing command response
It will be desirable to have a higher-level type for representing
command responses. This will allow us to do nicer things.
For now, the instance encapsulates existing logic. It is still
a bit primitive. But we're slowly making things better.
Version 1 protocols have a wrapping layer that decodes the raw
string data into a data structure and that data structure is
sent to the future. Version 2 doesn't yet have this layer and
the future is receiving the raw wire response. Hence why
debugcommands needed to be taught about the response type.
Differential Revision: https://phab.mercurial-scm.org/D3380
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 14 Apr 2018 11:46:08 -0700 |
parents | a656cba08a04 |
children | f7673845b167 |
line wrap: on
line source
# wireprotov2peer.py - client side code for wire protocol version 2 # # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import from .i18n import _ from .thirdparty import ( cbor, ) from . import ( error, util, wireprotoframing, ) class commandresponse(object): """Represents the response to a command request.""" def __init__(self, requestid, command): self.requestid = requestid self.command = command self.cbor = False self.b = util.bytesio() def cborobjects(self): """Obtain decoded CBOR objects from this response.""" size = self.b.tell() self.b.seek(0) decoder = cbor.CBORDecoder(self.b) while self.b.tell() < size: yield decoder.decode() class clienthandler(object): """Object to handle higher-level client activities. The ``clientreactor`` is used to hold low-level state about the frame-based protocol, such as which requests and streams are active. This type is used for higher-level operations, such as reading frames from a socket, exposing and managing a higher-level primitive for representing command responses, etc. This class is what peers should probably use to bridge wire activity with the higher-level peer API. """ def __init__(self, ui, clientreactor): self._ui = ui self._reactor = clientreactor self._requests = {} self._futures = {} self._responses = {} def callcommand(self, command, args, f): """Register a request to call a command. Returns an iterable of frames that should be sent over the wire. """ request, action, meta = self._reactor.callcommand(command, args) if action != 'noop': raise error.ProgrammingError('%s not yet supported' % action) rid = request.requestid self._requests[rid] = request self._futures[rid] = f self._responses[rid] = commandresponse(rid, command) return iter(()) def flushcommands(self): """Flush all queued commands. Returns an iterable of frames that should be sent over the wire. """ action, meta = self._reactor.flushcommands() if action != 'sendframes': raise error.ProgrammingError('%s not yet supported' % action) return meta['framegen'] def readframe(self, fh): """Attempt to read and process a frame. Returns None if no frame was read. Presumably this means EOF. """ frame = wireprotoframing.readframe(fh) if frame is None: # TODO tell reactor? return self._ui.note(_('received %r\n') % frame) self._processframe(frame) return True def _processframe(self, frame): """Process a single read frame.""" action, meta = self._reactor.onframerecv(frame) if action == 'error': e = error.RepoError(meta['message']) if frame.requestid in self._futures: self._futures[frame.requestid].set_exception(e) else: raise e if frame.requestid not in self._requests: raise error.ProgrammingError( 'received frame for unknown request; this is either a bug in ' 'the clientreactor not screening for this or this instance was ' 'never told about this request: %r' % frame) response = self._responses[frame.requestid] if action == 'responsedata': response.b.write(meta['data']) if meta['cbor']: response.cbor = True if meta['eos']: self._futures[frame.requestid].set_result(response) del self._requests[frame.requestid] del self._futures[frame.requestid] else: raise error.ProgrammingError( 'unhandled action from clientreactor: %s' % action)