520 |
520 |
521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests): |
521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests): |
522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, |
522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False, |
523 buffersends=True) |
523 buffersends=True) |
524 |
524 |
|
525 handler = wireprotov2peer.clienthandler(ui, reactor) |
|
526 |
525 url = '%s/%s' % (apiurl, permission) |
527 url = '%s/%s' % (apiurl, permission) |
526 |
528 |
527 if len(requests) > 1: |
529 if len(requests) > 1: |
528 url += '/multirequest' |
530 url += '/multirequest' |
529 else: |
531 else: |
530 url += '/%s' % requests[0][0] |
532 url += '/%s' % requests[0][0] |
531 |
533 |
532 # Request ID to (request, future) |
|
533 requestmap = {} |
|
534 |
|
535 for command, args, f in requests: |
534 for command, args, f in requests: |
536 request, action, meta = reactor.callcommand(command, args) |
535 assert not list(handler.callcommand(command, args, f)) |
537 assert action == 'noop' |
|
538 |
|
539 requestmap[request.requestid] = (request, f) |
|
540 |
|
541 action, meta = reactor.flushcommands() |
|
542 assert action == 'sendframes' |
|
543 |
536 |
544 # TODO stream this. |
537 # TODO stream this. |
545 body = b''.join(map(bytes, meta['framegen'])) |
538 body = b''.join(map(bytes, handler.flushcommands())) |
546 |
539 |
547 # TODO modify user-agent to reflect v2 |
540 # TODO modify user-agent to reflect v2 |
548 headers = { |
541 headers = { |
549 r'Accept': wireprotov2server.FRAMINGTYPE, |
542 r'Accept': wireprotov2server.FRAMINGTYPE, |
550 r'Content-Type': wireprotov2server.FRAMINGTYPE, |
543 r'Content-Type': wireprotov2server.FRAMINGTYPE, |
562 raise |
555 raise |
563 except httplib.HTTPException as e: |
556 except httplib.HTTPException as e: |
564 ui.traceback() |
557 ui.traceback() |
565 raise IOError(None, e) |
558 raise IOError(None, e) |
566 |
559 |
567 return reactor, requestmap, res |
560 return handler, res |
568 |
561 |
569 class queuedcommandfuture(pycompat.futures.Future): |
562 class queuedcommandfuture(pycompat.futures.Future): |
570 """Wraps result() on command futures to trigger submission on call.""" |
563 """Wraps result() on command futures to trigger submission on call.""" |
571 |
564 |
572 def result(self, timeout=None): |
565 def result(self, timeout=None): |
682 permission = { |
675 permission = { |
683 'push': 'rw', |
676 'push': 'rw', |
684 'pull': 'ro', |
677 'pull': 'ro', |
685 }[permissions.pop()] |
678 }[permissions.pop()] |
686 |
679 |
687 reactor, requests, resp = sendv2request( |
680 handler, resp = sendv2request( |
688 self._ui, self._opener, self._requestbuilder, self._apiurl, |
681 self._ui, self._opener, self._requestbuilder, self._apiurl, |
689 permission, calls) |
682 permission, calls) |
690 |
683 |
691 # TODO we probably want to validate the HTTP code, media type, etc. |
684 # TODO we probably want to validate the HTTP code, media type, etc. |
692 |
685 |
693 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) |
686 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) |
694 self._responsef = self._responseexecutor.submit(self._handleresponse, |
687 self._responsef = self._responseexecutor.submit(self._handleresponse, |
695 reactor, |
688 handler, resp) |
696 requests, |
|
697 resp) |
|
698 |
689 |
699 def close(self): |
690 def close(self): |
700 if self._closed: |
691 if self._closed: |
701 return |
692 return |
702 |
693 |
721 f.set_exception(error.ResponseError( |
712 f.set_exception(error.ResponseError( |
722 _('unfulfilled command response'))) |
713 _('unfulfilled command response'))) |
723 |
714 |
724 self._futures = None |
715 self._futures = None |
725 |
716 |
726 def _handleresponse(self, reactor, requests, resp): |
717 def _handleresponse(self, handler, resp): |
727 # Called in a thread to read the response. |
718 # Called in a thread to read the response. |
728 |
719 |
729 results = {k: [] for k in requests} |
720 while handler.readframe(resp): |
730 |
721 pass |
731 while True: |
|
732 frame = wireprotoframing.readframe(resp) |
|
733 if frame is None: |
|
734 break |
|
735 |
|
736 self._ui.note(_('received %r\n') % frame) |
|
737 |
|
738 # Guard against receiving a frame with a request ID that we |
|
739 # didn't issue. This should never happen. |
|
740 request, f = requests.get(frame.requestid, [None, None]) |
|
741 |
|
742 action, meta = reactor.onframerecv(frame) |
|
743 |
|
744 if action == 'responsedata': |
|
745 assert request.requestid == meta['request'].requestid |
|
746 |
|
747 result = results[request.requestid] |
|
748 |
|
749 if meta['cbor']: |
|
750 payload = util.bytesio(meta['data']) |
|
751 |
|
752 decoder = cbor.CBORDecoder(payload) |
|
753 while payload.tell() + 1 < len(meta['data']): |
|
754 try: |
|
755 result.append(decoder.decode()) |
|
756 except Exception: |
|
757 pycompat.future_set_exception_info( |
|
758 f, sys.exc_info()[1:]) |
|
759 continue |
|
760 else: |
|
761 result.append(meta['data']) |
|
762 |
|
763 if meta['eos']: |
|
764 f.set_result(result) |
|
765 del results[request.requestid] |
|
766 |
|
767 elif action == 'error': |
|
768 e = error.RepoError(meta['message']) |
|
769 |
|
770 if f: |
|
771 f.set_exception(e) |
|
772 else: |
|
773 raise e |
|
774 |
|
775 else: |
|
776 e = error.ProgrammingError('unhandled action: %s' % action) |
|
777 |
|
778 if f: |
|
779 f.set_exception(e) |
|
780 else: |
|
781 raise e |
|
782 |
722 |
783 # TODO implement interface for version 2 peers |
723 # TODO implement interface for version 2 peers |
784 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, |
724 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, |
785 repository.ipeerrequests) |
725 repository.ipeerrequests) |
786 class httpv2peer(object): |
726 class httpv2peer(object): |