comparison mercurial/wireprotov2peer.py @ 37719:a656cba08a04

wireprotov2: move response handling out of httppeer And fix some bugs while we're here. The code for processing response data from the unified framing protocol is mostly peer agnostic. The peer-specific bits are the configuration of the client reactor and how I/O is performed. I initially implemented things in httppeer for expediency. This commit establishes a module for holding the peer API level code for the framing based protocol. Inside this module we have a class to help coordinate higher-level activities, such as managing response object. The client handler bits could be rolled into clientreactor. However, I want clientreactor to be sans I/O and I want it to only be concerned with protocol-level details, not higher-level concepts like how protocol events are converted into peer API concepts. I want clientreactor to receive a frame and then tell the caller what should probably be done about it. If we start putting things like future resolution into clientreactor, we'll constrain how the protocol can be used (e.g. by requiring futures). The new code is loosely based on what was in httppeer before. I changed things a bit around response handling. We now buffer the entire response "body" and then handle it as one atomic unit. This fixed a bug around decoding CBOR data that spanned multiple frames. I also fixed an off-by-one bug where we failed to read a single byte CBOR value at the end of the stream. That's why tests have changed. The new state of httppeer is much cleaner. It is largely agnostic about framing protocol implementation details. That's how it should be: the framing protocol is designed to be largely transport agnostic. We want peers merely putting bytes on the wire and telling the framing protocol where to read response data from. There's still a bit of work to be done here, especially for representing responses. But at least we're a step closer to having a higher-level peer interface that can be plugged into the SSH peer someday. I initially added this class to wireprotoframing. However, we'll eventually need version 2 specific functions to convert CBOR responses into data structures expected by the code calling commands. This needs to live somewhere. Since that code would be shared across peers, we need a common module. We have wireprotov1peer for the equivalent version 1 code. So I decided to establish wireprotov2peer. Differential Revision: https://phab.mercurial-scm.org/D3379
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 14 Apr 2018 11:50:19 -0700
parents
children d715a85003c8
comparison
equal deleted inserted replaced
37718:ad1c07008e0b 37719:a656cba08a04
1 # wireprotov2peer.py - client side code for wire protocol version 2
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 from __future__ import absolute_import
9
10 from .i18n import _
11 from .thirdparty import (
12 cbor,
13 )
14 from . import (
15 error,
16 util,
17 wireprotoframing,
18 )
19
20 class clienthandler(object):
21 """Object to handle higher-level client activities.
22
23 The ``clientreactor`` is used to hold low-level state about the frame-based
24 protocol, such as which requests and streams are active. This type is used
25 for higher-level operations, such as reading frames from a socket, exposing
26 and managing a higher-level primitive for representing command responses,
27 etc. This class is what peers should probably use to bridge wire activity
28 with the higher-level peer API.
29 """
30
31 def __init__(self, ui, clientreactor):
32 self._ui = ui
33 self._reactor = clientreactor
34 self._requests = {}
35 self._futures = {}
36 self._responses = {}
37
38 def callcommand(self, command, args, f):
39 """Register a request to call a command.
40
41 Returns an iterable of frames that should be sent over the wire.
42 """
43 request, action, meta = self._reactor.callcommand(command, args)
44
45 if action != 'noop':
46 raise error.ProgrammingError('%s not yet supported' % action)
47
48 rid = request.requestid
49 self._requests[rid] = request
50 self._futures[rid] = f
51 self._responses[rid] = {
52 'cbor': False,
53 'b': util.bytesio(),
54 }
55
56 return iter(())
57
58 def flushcommands(self):
59 """Flush all queued commands.
60
61 Returns an iterable of frames that should be sent over the wire.
62 """
63 action, meta = self._reactor.flushcommands()
64
65 if action != 'sendframes':
66 raise error.ProgrammingError('%s not yet supported' % action)
67
68 return meta['framegen']
69
70 def readframe(self, fh):
71 """Attempt to read and process a frame.
72
73 Returns None if no frame was read. Presumably this means EOF.
74 """
75 frame = wireprotoframing.readframe(fh)
76 if frame is None:
77 # TODO tell reactor?
78 return
79
80 self._ui.note(_('received %r\n') % frame)
81 self._processframe(frame)
82
83 return True
84
85 def _processframe(self, frame):
86 """Process a single read frame."""
87
88 action, meta = self._reactor.onframerecv(frame)
89
90 if action == 'error':
91 e = error.RepoError(meta['message'])
92
93 if frame.requestid in self._futures:
94 self._futures[frame.requestid].set_exception(e)
95 else:
96 raise e
97
98 if frame.requestid not in self._requests:
99 raise error.ProgrammingError(
100 'received frame for unknown request; this is either a bug in '
101 'the clientreactor not screening for this or this instance was '
102 'never told about this request: %r' % frame)
103
104 response = self._responses[frame.requestid]
105
106 if action == 'responsedata':
107 response['b'].write(meta['data'])
108
109 if meta['cbor']:
110 response['cbor'] = True
111
112 if meta['eos']:
113 if meta['cbor']:
114 # If CBOR, decode every object.
115 b = response['b']
116
117 size = b.tell()
118 b.seek(0)
119
120 decoder = cbor.CBORDecoder(b)
121
122 result = []
123 while b.tell() < size:
124 result.append(decoder.decode())
125 else:
126 result = [response['b'].getvalue()]
127
128 self._futures[frame.requestid].set_result(result)
129
130 del self._requests[frame.requestid]
131 del self._futures[frame.requestid]
132
133 else:
134 raise error.ProgrammingError(
135 'unhandled action from clientreactor: %s' % action)