comparison mercurial/wireprotoframing.py @ 37058:c5e9c3b47366

wireproto: support for receiving multiple requests Now that we have request IDs on each frame and a specification that allows multiple requests to be issued simultaneously, possibly interleaved, let's teach the server to deal with that. Instead of tracking the state for *the* active command request, we instead track the state of each receiving command by its request ID. The multiple states in our state machine for processing each command's state has been collapsed into a single state for "receiving commands." Tests have been added so our branch coverage covers all meaningful branches. However, we did lose some logical coverage. The implementation of this new feature opens up the door to a server having partial command requests when end of input is reached. We will probably want a mechanism to deal with partial requests. For now, I've tracked that as a known issue in the class docstring. I've also noted an abuse vector that becomes a little bit easier to exploit with this feature. Differential Revision: https://phab.mercurial-scm.org/D2870
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 14 Mar 2018 16:53:30 -0700
parents 2ec1fb9de638
children 0a6c5cc09a88
comparison
equal deleted inserted replaced
37057:2ec1fb9de638 37058:c5e9c3b47366
325 Indicates that nothing of interest happened and the server is waiting on 325 Indicates that nothing of interest happened and the server is waiting on
326 more frames from the client before anything interesting can be done. 326 more frames from the client before anything interesting can be done.
327 327
328 noop 328 noop
329 Indicates no additional action is required. 329 Indicates no additional action is required.
330
331 Known Issues
332 ------------
333
334 There are no limits to the number of partially received commands or their
335 size. A malicious client could stream command request data and exhaust the
336 server's memory.
337
338 Partially received commands are not acted upon when end of input is
339 reached. Should the server error if it receives a partial request?
340 Should the client send a message to abort a partially transmitted request
341 to facilitate graceful shutdown?
342
343 Active requests that haven't been responded to aren't tracked. This means
344 that if we receive a command and instruct its dispatch, another command
345 with its request ID can come in over the wire and there will be a race
346 between who responds to what.
330 """ 347 """
331 348
332 def __init__(self, deferoutput=False): 349 def __init__(self, deferoutput=False):
333 """Construct a new server reactor. 350 """Construct a new server reactor.
334 351
340 sender cannot receive until all data has been transmitted. 357 sender cannot receive until all data has been transmitted.
341 """ 358 """
342 self._deferoutput = deferoutput 359 self._deferoutput = deferoutput
343 self._state = 'idle' 360 self._state = 'idle'
344 self._bufferedframegens = [] 361 self._bufferedframegens = []
345 self._activerequestid = None 362 # request id -> dict of commands that are actively being received.
346 self._activecommand = None 363 self._receivingcommands = {}
347 self._activeargs = None
348 self._activedata = None
349 self._expectingargs = None
350 self._expectingdata = None
351 self._activeargname = None
352 self._activeargchunks = None
353 364
354 def onframerecv(self, requestid, frametype, frameflags, payload): 365 def onframerecv(self, requestid, frametype, frameflags, payload):
355 """Process a frame that has been received off the wire. 366 """Process a frame that has been received off the wire.
356 367
357 Returns a dict with an ``action`` key that details what action, 368 Returns a dict with an ``action`` key that details what action,
358 if any, the consumer should take next. 369 if any, the consumer should take next.
359 """ 370 """
360 handlers = { 371 handlers = {
361 'idle': self._onframeidle, 372 'idle': self._onframeidle,
362 'command-receiving-args': self._onframereceivingargs, 373 'command-receiving': self._onframecommandreceiving,
363 'command-receiving-data': self._onframereceivingdata,
364 'errored': self._onframeerrored, 374 'errored': self._onframeerrored,
365 } 375 }
366 376
367 meth = handlers.get(self._state) 377 meth = handlers.get(self._state)
368 if not meth: 378 if not meth:
389 """Signals that end of input has been received. 399 """Signals that end of input has been received.
390 400
391 No more frames will be received. All pending activity should be 401 No more frames will be received. All pending activity should be
392 completed. 402 completed.
393 """ 403 """
404 # TODO should we do anything about in-flight commands?
405
394 if not self._deferoutput or not self._bufferedframegens: 406 if not self._deferoutput or not self._bufferedframegens:
395 return 'noop', {} 407 return 'noop', {}
396 408
397 # If we buffered all our responses, emit those. 409 # If we buffered all our responses, emit those.
398 def makegen(): 410 def makegen():
412 def _makeerrorresult(self, msg): 424 def _makeerrorresult(self, msg):
413 return 'error', { 425 return 'error', {
414 'message': msg, 426 'message': msg,
415 } 427 }
416 428
417 def _makeruncommandresult(self): 429 def _makeruncommandresult(self, requestid):
430 entry = self._receivingcommands[requestid]
431 del self._receivingcommands[requestid]
432
433 if self._receivingcommands:
434 self._state = 'command-receiving'
435 else:
436 self._state = 'idle'
437
418 return 'runcommand', { 438 return 'runcommand', {
419 'requestid': self._activerequestid, 439 'requestid': requestid,
420 'command': self._activecommand, 440 'command': entry['command'],
421 'args': self._activeargs, 441 'args': entry['args'],
422 'data': self._activedata.getvalue() if self._activedata else None, 442 'data': entry['data'].getvalue() if entry['data'] else None,
423 } 443 }
424 444
425 def _makewantframeresult(self): 445 def _makewantframeresult(self):
426 return 'wantframe', { 446 return 'wantframe', {
427 'state': self._state, 447 'state': self._state,
433 if frametype != FRAME_TYPE_COMMAND_NAME: 453 if frametype != FRAME_TYPE_COMMAND_NAME:
434 self._state = 'errored' 454 self._state = 'errored'
435 return self._makeerrorresult( 455 return self._makeerrorresult(
436 _('expected command frame; got %d') % frametype) 456 _('expected command frame; got %d') % frametype)
437 457
438 self._activerequestid = requestid 458 if requestid in self._receivingcommands:
439 self._activecommand = payload 459 self._state = 'errored'
440 self._activeargs = {} 460 return self._makeerrorresult(
441 self._activedata = None 461 _('request with ID %d already received') % requestid)
462
463 expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
464 expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
465
466 self._receivingcommands[requestid] = {
467 'command': payload,
468 'args': {},
469 'data': None,
470 'expectingargs': expectingargs,
471 'expectingdata': expectingdata,
472 }
442 473
443 if frameflags & FLAG_COMMAND_NAME_EOS: 474 if frameflags & FLAG_COMMAND_NAME_EOS:
444 return self._makeruncommandresult() 475 return self._makeruncommandresult(requestid)
445 476
446 self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS) 477 if expectingargs or expectingdata:
447 self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA) 478 self._state = 'command-receiving'
448
449 if self._expectingargs:
450 self._state = 'command-receiving-args'
451 return self._makewantframeresult()
452 elif self._expectingdata:
453 self._activedata = util.bytesio()
454 self._state = 'command-receiving-data'
455 return self._makewantframeresult() 479 return self._makewantframeresult()
456 else: 480 else:
457 self._state = 'errored' 481 self._state = 'errored'
458 return self._makeerrorresult(_('missing frame flags on ' 482 return self._makeerrorresult(_('missing frame flags on '
459 'command frame')) 483 'command frame'))
460 484
461 def _onframereceivingargs(self, requestid, frametype, frameflags, payload): 485 def _onframecommandreceiving(self, requestid, frametype, frameflags,
462 if frametype != FRAME_TYPE_COMMAND_ARGUMENT: 486 payload):
463 self._state = 'errored' 487 # It could be a new command request. Process it as such.
464 return self._makeerrorresult(_('expected command argument ' 488 if frametype == FRAME_TYPE_COMMAND_NAME:
465 'frame; got %d') % frametype) 489 return self._onframeidle(requestid, frametype, frameflags, payload)
490
491 # All other frames should be related to a command that is currently
492 # receiving.
493 if requestid not in self._receivingcommands:
494 self._state = 'errored'
495 return self._makeerrorresult(
496 _('received frame for request that is not receiving: %d') %
497 requestid)
498
499 entry = self._receivingcommands[requestid]
500
501 if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
502 if not entry['expectingargs']:
503 self._state = 'errored'
504 return self._makeerrorresult(_(
505 'received command argument frame for request that is not '
506 'expecting arguments: %d') % requestid)
507
508 return self._handlecommandargsframe(requestid, entry, frametype,
509 frameflags, payload)
510
511 elif frametype == FRAME_TYPE_COMMAND_DATA:
512 if not entry['expectingdata']:
513 self._state = 'errored'
514 return self._makeerrorresult(_(
515 'received command data frame for request that is not '
516 'expecting data: %d') % requestid)
517
518 if entry['data'] is None:
519 entry['data'] = util.bytesio()
520
521 return self._handlecommanddataframe(requestid, entry, frametype,
522 frameflags, payload)
523
524 def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
525 payload):
526 # The frame and state of command should have already been validated.
527 assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
466 528
467 offset = 0 529 offset = 0
468 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload) 530 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
469 offset += ARGUMENT_FRAME_HEADER.size 531 offset += ARGUMENT_FRAME_HEADER.size
470 532
481 543
482 # Argument value spans multiple frames. Record our active state 544 # Argument value spans multiple frames. Record our active state
483 # and wait for the next frame. 545 # and wait for the next frame.
484 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION: 546 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
485 raise error.ProgrammingError('not yet implemented') 547 raise error.ProgrammingError('not yet implemented')
486 self._activeargname = argname
487 self._activeargchunks = [argvalue]
488 self._state = 'command-arg-continuation'
489 return self._makewantframeresult()
490 548
491 # Common case: the argument value is completely contained in this 549 # Common case: the argument value is completely contained in this
492 # frame. 550 # frame.
493 551
494 if len(argvalue) != valuesize: 552 if len(argvalue) != valuesize:
495 self._state = 'errored' 553 self._state = 'errored'
496 return self._makeerrorresult(_('malformed argument frame: ' 554 return self._makeerrorresult(_('malformed argument frame: '
497 'partial argument value')) 555 'partial argument value'))
498 556
499 self._activeargs[argname] = argvalue 557 entry['args'][argname] = argvalue
500 558
501 if frameflags & FLAG_COMMAND_ARGUMENT_EOA: 559 if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
502 if self._expectingdata: 560 if entry['expectingdata']:
503 self._state = 'command-receiving-data'
504 self._activedata = util.bytesio()
505 # TODO signal request to run a command once we don't 561 # TODO signal request to run a command once we don't
506 # buffer data frames. 562 # buffer data frames.
507 return self._makewantframeresult() 563 return self._makewantframeresult()
508 else: 564 else:
509 self._state = 'waiting' 565 return self._makeruncommandresult(requestid)
510 return self._makeruncommandresult()
511 else: 566 else:
512 return self._makewantframeresult() 567 return self._makewantframeresult()
513 568
514 def _onframereceivingdata(self, requestid, frametype, frameflags, payload): 569 def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
515 if frametype != FRAME_TYPE_COMMAND_DATA: 570 payload):
516 self._state = 'errored' 571 assert frametype == FRAME_TYPE_COMMAND_DATA
517 return self._makeerrorresult(_('expected command data frame; '
518 'got %d') % frametype)
519 572
520 # TODO support streaming data instead of buffering it. 573 # TODO support streaming data instead of buffering it.
521 self._activedata.write(payload) 574 entry['data'].write(payload)
522 575
523 if frameflags & FLAG_COMMAND_DATA_CONTINUATION: 576 if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
524 return self._makewantframeresult() 577 return self._makewantframeresult()
525 elif frameflags & FLAG_COMMAND_DATA_EOS: 578 elif frameflags & FLAG_COMMAND_DATA_EOS:
526 self._activedata.seek(0) 579 entry['data'].seek(0)
527 self._state = 'idle' 580 return self._makeruncommandresult(requestid)
528 return self._makeruncommandresult()
529 else: 581 else:
530 self._state = 'errored' 582 self._state = 'errored'
531 return self._makeerrorresult(_('command data frame without ' 583 return self._makeerrorresult(_('command data frame without '
532 'flags')) 584 'flags'))
533 585