comparison mercurial/httppeer.py @ 37651:950294e28136

httppeer: implement command executor for version 2 peer Now that we have a new API for issuing commands which is compatible with wire protocol version 2, we can start using it with wire protocol version 2. This commit replaces our hacky implementation of _call() with something a bit more robust based on the new command executor interface. We now have proper support for issuing multiple commands per HTTP request. Each HTTP request maintains its own client reactor. The implementation is similar to the one in the legacy wire protocol. We use a ThreadPoolExecutor for spinning up a thread to read the HTTP response in the background. This allows responses to resolve in any order. While not implemented on the server yet, a client could use concurrent.futures.as_completed() with a collection of futures and handle responses as they arrive from the server. The return value from issued commands is still a simple list of raw or decoded CBOR data. This is still super hacky. We will want a rich data type for representing command responses. But at least this commit gets us one step closer to a proper peer implementation. Differential Revision: https://phab.mercurial-scm.org/D3297
author Gregory Szorc <gregory.szorc@gmail.com>
date Fri, 13 Apr 2018 12:30:04 -0700
parents 77c9ee77687c
children 8cea0d57bf37
comparison
equal deleted inserted replaced
37650:62ebfda864de 37651:950294e28136
11 import errno 11 import errno
12 import io 12 import io
13 import os 13 import os
14 import socket 14 import socket
15 import struct 15 import struct
16 import sys
16 import tempfile 17 import tempfile
18 import weakref
17 19
18 from .i18n import _ 20 from .i18n import _
19 from .thirdparty import ( 21 from .thirdparty import (
20 cbor, 22 cbor,
21 ) 23 )
29 pycompat, 31 pycompat,
30 repository, 32 repository,
31 statichttprepo, 33 statichttprepo,
32 url as urlmod, 34 url as urlmod,
33 util, 35 util,
34 wireproto,
35 wireprotoframing, 36 wireprotoframing,
36 wireprototypes, 37 wireprototypes,
37 wireprotov1peer, 38 wireprotov1peer,
38 wireprotov2server, 39 wireprotov2server,
39 ) 40 )
515 return self._callstream(cmd, _compressible=True, **args) 516 return self._callstream(cmd, _compressible=True, **args)
516 517
517 def _abort(self, exception): 518 def _abort(self, exception):
518 raise exception 519 raise exception
519 520
521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
523 buffersends=True)
524
525 url = '%s/%s' % (apiurl, permission)
526
527 if len(requests) > 1:
528 url += '/multirequest'
529 else:
530 url += '/%s' % requests[0][0]
531
532 # Request ID to (request, future)
533 requestmap = {}
534
535 for command, args, f in requests:
536 request, action, meta = reactor.callcommand(command, args)
537 assert action == 'noop'
538
539 requestmap[request.requestid] = (request, f)
540
541 action, meta = reactor.flushcommands()
542 assert action == 'sendframes'
543
544 # TODO stream this.
545 body = b''.join(map(bytes, meta['framegen']))
546
547 # TODO modify user-agent to reflect v2
548 headers = {
549 r'Accept': wireprotov2server.FRAMINGTYPE,
550 r'Content-Type': wireprotov2server.FRAMINGTYPE,
551 }
552
553 req = requestbuilder(pycompat.strurl(url), body, headers)
554 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
555
556 try:
557 res = opener.open(req)
558 except urlerr.httperror as e:
559 if e.code == 401:
560 raise error.Abort(_('authorization failed'))
561
562 raise
563 except httplib.HTTPException as e:
564 ui.traceback()
565 raise IOError(None, e)
566
567 return reactor, requestmap, res
568
569 class queuedcommandfuture(pycompat.futures.Future):
570 """Wraps result() on command futures to trigger submission on call."""
571
572 def result(self, timeout=None):
573 if self.done():
574 return pycompat.futures.Future.result(self, timeout)
575
576 self._peerexecutor.sendcommands()
577
578 # sendcommands() will restore the original __class__ and self.result
579 # will resolve to Future.result.
580 return self.result(timeout)
581
582 @zi.implementer(repository.ipeercommandexecutor)
583 class httpv2executor(object):
584 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
585 self._ui = ui
586 self._opener = opener
587 self._requestbuilder = requestbuilder
588 self._apiurl = apiurl
589 self._descriptor = descriptor
590 self._sent = False
591 self._closed = False
592 self._neededpermissions = set()
593 self._calls = []
594 self._futures = weakref.WeakSet()
595 self._responseexecutor = None
596 self._responsef = None
597
598 def __enter__(self):
599 return self
600
601 def __exit__(self, exctype, excvalue, exctb):
602 self.close()
603
604 def callcommand(self, command, args):
605 if self._sent:
606 raise error.ProgrammingError('callcommand() cannot be used after '
607 'commands are sent')
608
609 if self._closed:
610 raise error.ProgrammingError('callcommand() cannot be used after '
611 'close()')
612
613 # The service advertises which commands are available. So if we attempt
614 # to call an unknown command or pass an unknown argument, we can screen
615 # for this.
616 if command not in self._descriptor['commands']:
617 raise error.ProgrammingError(
618 'wire protocol command %s is not available' % command)
619
620 cmdinfo = self._descriptor['commands'][command]
621 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
622
623 if unknownargs:
624 raise error.ProgrammingError(
625 'wire protocol command %s does not accept argument: %s' % (
626 command, ', '.join(sorted(unknownargs))))
627
628 self._neededpermissions |= set(cmdinfo['permissions'])
629
630 # TODO we /could/ also validate types here, since the API descriptor
631 # includes types...
632
633 f = pycompat.futures.Future()
634
635 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
636 # could deadlock.
637 f.__class__ = queuedcommandfuture
638 f._peerexecutor = self
639
640 self._futures.add(f)
641 self._calls.append((command, args, f))
642
643 return f
644
645 def sendcommands(self):
646 if self._sent:
647 return
648
649 if not self._calls:
650 return
651
652 self._sent = True
653
654 # Unhack any future types so caller sees a clean type and so we
655 # break reference cycle.
656 for f in self._futures:
657 if isinstance(f, queuedcommandfuture):
658 f.__class__ = pycompat.futures.Future
659 f._peerexecutor = None
660
661 # Mark the future as running and filter out cancelled futures.
662 calls = [(command, args, f)
663 for command, args, f in self._calls
664 if f.set_running_or_notify_cancel()]
665
666 # Clear out references, prevent improper object usage.
667 self._calls = None
668
669 if not calls:
670 return
671
672 permissions = set(self._neededpermissions)
673
674 if 'push' in permissions and 'pull' in permissions:
675 permissions.remove('pull')
676
677 if len(permissions) > 1:
678 raise error.RepoError(_('cannot make request requiring multiple '
679 'permissions: %s') %
680 _(', ').join(sorted(permissions)))
681
682 permission = {
683 'push': 'rw',
684 'pull': 'ro',
685 }[permissions.pop()]
686
687 reactor, requests, resp = sendv2request(
688 self._ui, self._opener, self._requestbuilder, self._apiurl,
689 permission, calls)
690
691 # TODO we probably want to validate the HTTP code, media type, etc.
692
693 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
694 self._responsef = self._responseexecutor.submit(self._handleresponse,
695 reactor,
696 requests,
697 resp)
698
699 def close(self):
700 if self._closed:
701 return
702
703 self.sendcommands()
704
705 self._closed = True
706
707 if not self._responsef:
708 return
709
710 try:
711 self._responsef.result()
712 finally:
713 self._responseexecutor.shutdown(wait=True)
714 self._responsef = None
715 self._responseexecutor = None
716
717 # If any of our futures are still in progress, mark them as
718 # errored, otherwise a result() could wait indefinitely.
719 for f in self._futures:
720 if not f.done():
721 f.set_exception(error.ResponseError(
722 _('unfulfilled command response')))
723
724 self._futures = None
725
726 def _handleresponse(self, reactor, requests, resp):
727 # Called in a thread to read the response.
728
729 results = {k: [] for k in requests}
730
731 while True:
732 frame = wireprotoframing.readframe(resp)
733 if frame is None:
734 break
735
736 self._ui.note(_('received %r\n') % frame)
737
738 # Guard against receiving a frame with a request ID that we
739 # didn't issue. This should never happen.
740 request, f = requests.get(frame.requestid, [None, None])
741
742 action, meta = reactor.onframerecv(frame)
743
744 if action == 'responsedata':
745 assert request.requestid == meta['request'].requestid
746
747 result = results[request.requestid]
748
749 if meta['cbor']:
750 payload = util.bytesio(meta['data'])
751
752 decoder = cbor.CBORDecoder(payload)
753 while payload.tell() + 1 < len(meta['data']):
754 try:
755 result.append(decoder.decode())
756 except Exception:
757 f.set_exception_info(*sys.exc_info()[1:])
758 continue
759 else:
760 result.append(meta['data'])
761
762 if meta['eos']:
763 f.set_result(result)
764 del results[request.requestid]
765
766 else:
767 e = error.ProgrammingError('unhandled action: %s' % action)
768
769 if f:
770 f.set_exception(e)
771 else:
772 raise e
773
520 # TODO implement interface for version 2 peers 774 # TODO implement interface for version 2 peers
521 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities) 775 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
776 repository.ipeerrequests)
522 class httpv2peer(object): 777 class httpv2peer(object):
523 def __init__(self, ui, repourl, apipath, opener, requestbuilder, 778 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
524 apidescriptor): 779 apidescriptor):
525 self.ui = ui 780 self.ui = ui
526 781
527 if repourl.endswith('/'): 782 if repourl.endswith('/'):
528 repourl = repourl[:-1] 783 repourl = repourl[:-1]
529 784
530 self._url = repourl 785 self._url = repourl
531 self._apipath = apipath 786 self._apipath = apipath
787 self._apiurl = '%s/%s' % (repourl, apipath)
532 self._opener = opener 788 self._opener = opener
533 self._requestbuilder = requestbuilder 789 self._requestbuilder = requestbuilder
534 self._descriptor = apidescriptor 790 self._descriptor = apidescriptor
535 791
536 # Start of ipeerconnection. 792 # Start of ipeerconnection.
578 _('cannot %s; client or remote repository does not support the %r ' 834 _('cannot %s; client or remote repository does not support the %r '
579 'capability') % (purpose, name)) 835 'capability') % (purpose, name))
580 836
581 # End of ipeercapabilities. 837 # End of ipeercapabilities.
582 838
583 # TODO require to be part of a batched primitive, use futures.
584 def _call(self, name, **args): 839 def _call(self, name, **args):
585 """Call a wire protocol command with arguments.""" 840 with self.commandexecutor() as e:
586 841 return e.callcommand(name, args).result()
587 # Having this early has a side-effect of importing wireprotov2server, 842
588 # which has the side-effect of ensuring commands are registered. 843 def commandexecutor(self):
589 844 return httpv2executor(self.ui, self._opener, self._requestbuilder,
590 # TODO modify user-agent to reflect v2. 845 self._apiurl, self._descriptor)
591 headers = {
592 r'Accept': wireprotov2server.FRAMINGTYPE,
593 r'Content-Type': wireprotov2server.FRAMINGTYPE,
594 }
595
596 # TODO permissions should come from capabilities results.
597 permission = wireproto.commandsv2[name].permission
598 if permission not in ('push', 'pull'):
599 raise error.ProgrammingError('unknown permission type: %s' %
600 permission)
601
602 permission = {
603 'push': 'rw',
604 'pull': 'ro',
605 }[permission]
606
607 url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name)
608
609 # TODO this should be part of a generic peer for the frame-based
610 # protocol.
611 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
612 buffersends=True)
613
614 request, action, meta = reactor.callcommand(name, args)
615 assert action == 'noop'
616
617 action, meta = reactor.flushcommands()
618 assert action == 'sendframes'
619
620 body = b''.join(map(bytes, meta['framegen']))
621 req = self._requestbuilder(pycompat.strurl(url), body, headers)
622 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
623
624 # TODO unify this code with httppeer.
625 try:
626 res = self._opener.open(req)
627 except urlerr.httperror as e:
628 if e.code == 401:
629 raise error.Abort(_('authorization failed'))
630
631 raise
632 except httplib.HTTPException as e:
633 self.ui.traceback()
634 raise IOError(None, e)
635
636 # TODO validate response type, wrap response to handle I/O errors.
637 # TODO more robust frame receiver.
638 results = []
639
640 while True:
641 frame = wireprotoframing.readframe(res)
642 if frame is None:
643 break
644
645 self.ui.note(_('received %r\n') % frame)
646
647 action, meta = reactor.onframerecv(frame)
648
649 if action == 'responsedata':
650 if meta['cbor']:
651 payload = util.bytesio(meta['data'])
652
653 decoder = cbor.CBORDecoder(payload)
654 while payload.tell() + 1 < len(meta['data']):
655 results.append(decoder.decode())
656 else:
657 results.append(meta['data'])
658 else:
659 error.ProgrammingError('unhandled action: %s' % action)
660
661 return results
662 846
663 # Registry of API service names to metadata about peers that handle it. 847 # Registry of API service names to metadata about peers that handle it.
664 # 848 #
665 # The following keys are meaningful: 849 # The following keys are meaningful:
666 # 850 #