mercurial/httppeer.py
changeset 37719 a656cba08a04
parent 37717 0664be4f0c1f
child 37736 e10b695b9c41
equal deleted inserted replaced
37718:ad1c07008e0b 37719:a656cba08a04
    11 import errno
    11 import errno
    12 import io
    12 import io
    13 import os
    13 import os
    14 import socket
    14 import socket
    15 import struct
    15 import struct
    16 import sys
       
    17 import tempfile
    16 import tempfile
    18 import weakref
    17 import weakref
    19 
    18 
    20 from .i18n import _
    19 from .i18n import _
    21 from .thirdparty import (
    20 from .thirdparty import (
    34     url as urlmod,
    33     url as urlmod,
    35     util,
    34     util,
    36     wireprotoframing,
    35     wireprotoframing,
    37     wireprototypes,
    36     wireprototypes,
    38     wireprotov1peer,
    37     wireprotov1peer,
       
    38     wireprotov2peer,
    39     wireprotov2server,
    39     wireprotov2server,
    40 )
    40 )
    41 
    41 
    42 httplib = util.httplib
    42 httplib = util.httplib
    43 urlerr = util.urlerr
    43 urlerr = util.urlerr
   520 
   520 
   521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
   521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
   522     reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
   522     reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
   523                                              buffersends=True)
   523                                              buffersends=True)
   524 
   524 
       
   525     handler = wireprotov2peer.clienthandler(ui, reactor)
       
   526 
   525     url = '%s/%s' % (apiurl, permission)
   527     url = '%s/%s' % (apiurl, permission)
   526 
   528 
   527     if len(requests) > 1:
   529     if len(requests) > 1:
   528         url += '/multirequest'
   530         url += '/multirequest'
   529     else:
   531     else:
   530         url += '/%s' % requests[0][0]
   532         url += '/%s' % requests[0][0]
   531 
   533 
   532     # Request ID to (request, future)
       
   533     requestmap = {}
       
   534 
       
   535     for command, args, f in requests:
   534     for command, args, f in requests:
   536         request, action, meta = reactor.callcommand(command, args)
   535         assert not list(handler.callcommand(command, args, f))
   537         assert action == 'noop'
       
   538 
       
   539         requestmap[request.requestid] = (request, f)
       
   540 
       
   541     action, meta = reactor.flushcommands()
       
   542     assert action == 'sendframes'
       
   543 
   536 
   544     # TODO stream this.
   537     # TODO stream this.
   545     body = b''.join(map(bytes, meta['framegen']))
   538     body = b''.join(map(bytes, handler.flushcommands()))
   546 
   539 
   547     # TODO modify user-agent to reflect v2
   540     # TODO modify user-agent to reflect v2
   548     headers = {
   541     headers = {
   549         r'Accept': wireprotov2server.FRAMINGTYPE,
   542         r'Accept': wireprotov2server.FRAMINGTYPE,
   550         r'Content-Type': wireprotov2server.FRAMINGTYPE,
   543         r'Content-Type': wireprotov2server.FRAMINGTYPE,
   562         raise
   555         raise
   563     except httplib.HTTPException as e:
   556     except httplib.HTTPException as e:
   564         ui.traceback()
   557         ui.traceback()
   565         raise IOError(None, e)
   558         raise IOError(None, e)
   566 
   559 
   567     return reactor, requestmap, res
   560     return handler, res
   568 
   561 
   569 class queuedcommandfuture(pycompat.futures.Future):
   562 class queuedcommandfuture(pycompat.futures.Future):
   570     """Wraps result() on command futures to trigger submission on call."""
   563     """Wraps result() on command futures to trigger submission on call."""
   571 
   564 
   572     def result(self, timeout=None):
   565     def result(self, timeout=None):
   682         permission = {
   675         permission = {
   683             'push': 'rw',
   676             'push': 'rw',
   684             'pull': 'ro',
   677             'pull': 'ro',
   685         }[permissions.pop()]
   678         }[permissions.pop()]
   686 
   679 
   687         reactor, requests, resp = sendv2request(
   680         handler, resp = sendv2request(
   688             self._ui, self._opener, self._requestbuilder, self._apiurl,
   681             self._ui, self._opener, self._requestbuilder, self._apiurl,
   689             permission, calls)
   682             permission, calls)
   690 
   683 
   691         # TODO we probably want to validate the HTTP code, media type, etc.
   684         # TODO we probably want to validate the HTTP code, media type, etc.
   692 
   685 
   693         self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
   686         self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
   694         self._responsef = self._responseexecutor.submit(self._handleresponse,
   687         self._responsef = self._responseexecutor.submit(self._handleresponse,
   695                                                         reactor,
   688                                                         handler, resp)
   696                                                         requests,
       
   697                                                         resp)
       
   698 
   689 
   699     def close(self):
   690     def close(self):
   700         if self._closed:
   691         if self._closed:
   701             return
   692             return
   702 
   693 
   721                     f.set_exception(error.ResponseError(
   712                     f.set_exception(error.ResponseError(
   722                         _('unfulfilled command response')))
   713                         _('unfulfilled command response')))
   723 
   714 
   724             self._futures = None
   715             self._futures = None
   725 
   716 
   726     def _handleresponse(self, reactor, requests, resp):
   717     def _handleresponse(self, handler, resp):
   727         # Called in a thread to read the response.
   718         # Called in a thread to read the response.
   728 
   719 
   729         results = {k: [] for k in requests}
   720         while handler.readframe(resp):
   730 
   721             pass
   731         while True:
       
   732             frame = wireprotoframing.readframe(resp)
       
   733             if frame is None:
       
   734                 break
       
   735 
       
   736             self._ui.note(_('received %r\n') % frame)
       
   737 
       
   738             # Guard against receiving a frame with a request ID that we
       
   739             # didn't issue. This should never happen.
       
   740             request, f = requests.get(frame.requestid, [None, None])
       
   741 
       
   742             action, meta = reactor.onframerecv(frame)
       
   743 
       
   744             if action == 'responsedata':
       
   745                 assert request.requestid == meta['request'].requestid
       
   746 
       
   747                 result = results[request.requestid]
       
   748 
       
   749                 if meta['cbor']:
       
   750                     payload = util.bytesio(meta['data'])
       
   751 
       
   752                     decoder = cbor.CBORDecoder(payload)
       
   753                     while payload.tell() + 1 < len(meta['data']):
       
   754                         try:
       
   755                             result.append(decoder.decode())
       
   756                         except Exception:
       
   757                             pycompat.future_set_exception_info(
       
   758                                 f, sys.exc_info()[1:])
       
   759                             continue
       
   760                 else:
       
   761                     result.append(meta['data'])
       
   762 
       
   763                 if meta['eos']:
       
   764                     f.set_result(result)
       
   765                     del results[request.requestid]
       
   766 
       
   767             elif action == 'error':
       
   768                 e = error.RepoError(meta['message'])
       
   769 
       
   770                 if f:
       
   771                     f.set_exception(e)
       
   772                 else:
       
   773                     raise e
       
   774 
       
   775             else:
       
   776                 e = error.ProgrammingError('unhandled action: %s' % action)
       
   777 
       
   778                 if f:
       
   779                     f.set_exception(e)
       
   780                 else:
       
   781                     raise e
       
   782 
   722 
   783 # TODO implement interface for version 2 peers
   723 # TODO implement interface for version 2 peers
   784 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
   724 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
   785                 repository.ipeerrequests)
   725                 repository.ipeerrequests)
   786 class httpv2peer(object):
   726 class httpv2peer(object):