comparison mercurial/wireprotoframing.py @ 37055:61393f888dfe

wireproto: define and implement responses in framing protocol Previously, we only had client-side frame types defined. This commit defines and implements basic support for server-side frame types. We introduce two frame types - one for representing the raw bytes result of a command and another for representing error results. The types are quite primitive and behavior will expand over time. But you have to start somewhere. Our server reactor gains methods to react to an intent to send a response. Again, following the "sans I/O" pattern, the reactor doesn't actually send the data. Instead, it gives the caller a generator to frames that it can send out over the wire. Differential Revision: https://phab.mercurial-scm.org/D2858
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 14 Mar 2018 13:57:52 -0700
parents 8c3c47362934
children 861e9d37e56e
comparison
equal deleted inserted replaced
37054:e7a012b60d6e 37055:61393f888dfe
23 DEFAULT_MAX_FRAME_SIZE = 32768 23 DEFAULT_MAX_FRAME_SIZE = 32768
24 24
25 FRAME_TYPE_COMMAND_NAME = 0x01 25 FRAME_TYPE_COMMAND_NAME = 0x01
26 FRAME_TYPE_COMMAND_ARGUMENT = 0x02 26 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
27 FRAME_TYPE_COMMAND_DATA = 0x03 27 FRAME_TYPE_COMMAND_DATA = 0x03
28 FRAME_TYPE_BYTES_RESPONSE = 0x04
29 FRAME_TYPE_ERROR_RESPONSE = 0x05
28 30
29 FRAME_TYPES = { 31 FRAME_TYPES = {
30 b'command-name': FRAME_TYPE_COMMAND_NAME, 32 b'command-name': FRAME_TYPE_COMMAND_NAME,
31 b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT, 33 b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
32 b'command-data': FRAME_TYPE_COMMAND_DATA, 34 b'command-data': FRAME_TYPE_COMMAND_DATA,
35 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
36 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
33 } 37 }
34 38
35 FLAG_COMMAND_NAME_EOS = 0x01 39 FLAG_COMMAND_NAME_EOS = 0x01
36 FLAG_COMMAND_NAME_HAVE_ARGS = 0x02 40 FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
37 FLAG_COMMAND_NAME_HAVE_DATA = 0x04 41 FLAG_COMMAND_NAME_HAVE_DATA = 0x04
54 FLAG_COMMAND_DATA_EOS = 0x02 58 FLAG_COMMAND_DATA_EOS = 0x02
55 59
56 FLAGS_COMMAND_DATA = { 60 FLAGS_COMMAND_DATA = {
57 b'continuation': FLAG_COMMAND_DATA_CONTINUATION, 61 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
58 b'eos': FLAG_COMMAND_DATA_EOS, 62 b'eos': FLAG_COMMAND_DATA_EOS,
63 }
64
65 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
66 FLAG_BYTES_RESPONSE_EOS = 0x02
67
68 FLAGS_BYTES_RESPONSE = {
69 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
70 b'eos': FLAG_BYTES_RESPONSE_EOS,
71 }
72
73 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
74 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
75
76 FLAGS_ERROR_RESPONSE = {
77 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
78 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
59 } 79 }
60 80
61 # Maps frame types to their available flags. 81 # Maps frame types to their available flags.
62 FRAME_TYPE_FLAGS = { 82 FRAME_TYPE_FLAGS = {
63 FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND, 83 FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
64 FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT, 84 FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
65 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA, 85 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
86 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
87 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
66 } 88 }
67 89
68 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH') 90 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
69 91
70 def makeframe(frametype, frameflags, payload): 92 def makeframe(frametype, frameflags, payload):
200 yield makeframe(FRAME_TYPE_COMMAND_DATA, flags, data) 222 yield makeframe(FRAME_TYPE_COMMAND_DATA, flags, data)
201 223
202 if done: 224 if done:
203 break 225 break
204 226
227 def createbytesresponseframesfrombytes(data,
228 maxframesize=DEFAULT_MAX_FRAME_SIZE):
229 """Create a raw frame to send a bytes response from static bytes input.
230
231 Returns a generator of bytearrays.
232 """
233
234 # Simple case of a single frame.
235 if len(data) <= maxframesize:
236 yield makeframe(FRAME_TYPE_BYTES_RESPONSE,
237 FLAG_BYTES_RESPONSE_EOS, data)
238 return
239
240 offset = 0
241 while True:
242 chunk = data[offset:offset + maxframesize]
243 offset += len(chunk)
244 done = offset == len(data)
245
246 if done:
247 flags = FLAG_BYTES_RESPONSE_EOS
248 else:
249 flags = FLAG_BYTES_RESPONSE_CONTINUATION
250
251 yield makeframe(FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
252
253 if done:
254 break
255
256 def createerrorframe(msg, protocol=False, application=False):
257 # TODO properly handle frame size limits.
258 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
259
260 flags = 0
261 if protocol:
262 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
263 if application:
264 flags |= FLAG_ERROR_RESPONSE_APPLICATION
265
266 yield makeframe(FRAME_TYPE_ERROR_RESPONSE, flags, msg)
267
205 class serverreactor(object): 268 class serverreactor(object):
206 """Holds state of a server handling frame-based protocol requests. 269 """Holds state of a server handling frame-based protocol requests.
207 270
208 This class is the "brain" of the unified frame-based protocol server 271 This class is the "brain" of the unified frame-based protocol server
209 component. While the protocol is stateless from the perspective of 272 component. While the protocol is stateless from the perspective of
227 a dict) specific to that action that contains more information. e.g. 290 a dict) specific to that action that contains more information. e.g.
228 if the server wants to send frames back to the client, the data structure 291 if the server wants to send frames back to the client, the data structure
229 will contain a reference to those frames. 292 will contain a reference to those frames.
230 293
231 Valid actions that consumers can be instructed to take are: 294 Valid actions that consumers can be instructed to take are:
295
296 sendframes
297 Indicates that frames should be sent to the client. The ``framegen``
298 key contains a generator of frames that should be sent. The server
299 assumes that all frames are sent to the client.
232 300
233 error 301 error
234 Indicates that an error occurred. Consumer should probably abort. 302 Indicates that an error occurred. Consumer should probably abort.
235 303
236 runcommand 304 runcommand
268 meth = handlers.get(self._state) 336 meth = handlers.get(self._state)
269 if not meth: 337 if not meth:
270 raise error.ProgrammingError('unhandled state: %s' % self._state) 338 raise error.ProgrammingError('unhandled state: %s' % self._state)
271 339
272 return meth(frametype, frameflags, payload) 340 return meth(frametype, frameflags, payload)
341
342 def onbytesresponseready(self, data):
343 """Signal that a bytes response is ready to be sent to the client.
344
345 The raw bytes response is passed as an argument.
346 """
347 return 'sendframes', {
348 'framegen': createbytesresponseframesfrombytes(data),
349 }
350
351 def onapplicationerror(self, msg):
352 return 'sendframes', {
353 'framegen': createerrorframe(msg, application=True),
354 }
273 355
274 def _makeerrorresult(self, msg): 356 def _makeerrorresult(self, msg):
275 return 'error', { 357 return 'error', {
276 'message': msg, 358 'message': msg,
277 } 359 }