mercurial/wireprotov1peer.py
changeset 37633 33a6eee08db2
parent 37631 2f626233859b
child 37635 cc8c06835097
equal deleted inserted replaced
37632:6c55ce51d6c3 37633:33a6eee08db2
    71     def set(self, value):
    71     def set(self, value):
    72         if util.safehasattr(self, 'value'):
    72         if util.safehasattr(self, 'value'):
    73             raise error.RepoError("future is already set")
    73             raise error.RepoError("future is already set")
    74         self.value = value
    74         self.value = value
    75 
    75 
    76 class batcher(object):
       
    77     '''base class for batches of commands submittable in a single request
       
    78 
       
    79     All methods invoked on instances of this class are simply queued and
       
    80     return a a future for the result. Once you call submit(), all the queued
       
    81     calls are performed and the results set in their respective futures.
       
    82     '''
       
    83     def __init__(self):
       
    84         self.calls = []
       
    85     def __getattr__(self, name):
       
    86         def call(*args, **opts):
       
    87             resref = future()
       
    88             # Please don't invent non-ascii method names, or you will
       
    89             # give core hg a very sad time.
       
    90             self.calls.append((name.encode('ascii'), args, opts, resref,))
       
    91             return resref
       
    92         return call
       
    93     def submit(self):
       
    94         raise NotImplementedError()
       
    95 
       
    96 class iterbatcher(batcher):
       
    97 
       
    98     def submit(self):
       
    99         raise NotImplementedError()
       
   100 
       
   101     def results(self):
       
   102         raise NotImplementedError()
       
   103 
       
   104 class remoteiterbatcher(iterbatcher):
       
   105     def __init__(self, remote):
       
   106         super(remoteiterbatcher, self).__init__()
       
   107         self._remote = remote
       
   108 
       
   109     def __getattr__(self, name):
       
   110         # Validate this method is batchable, since submit() only supports
       
   111         # batchable methods.
       
   112         fn = getattr(self._remote, name)
       
   113         if not getattr(fn, 'batchable', None):
       
   114             raise error.ProgrammingError('Attempted to batch a non-batchable '
       
   115                                          'call to %r' % name)
       
   116 
       
   117         return super(remoteiterbatcher, self).__getattr__(name)
       
   118 
       
   119     def submit(self):
       
   120         """Break the batch request into many patch calls and pipeline them.
       
   121 
       
   122         This is mostly valuable over http where request sizes can be
       
   123         limited, but can be used in other places as well.
       
   124         """
       
   125         # 2-tuple of (command, arguments) that represents what will be
       
   126         # sent over the wire.
       
   127         requests = []
       
   128 
       
   129         # 4-tuple of (command, final future, @batchable generator, remote
       
   130         # future).
       
   131         results = []
       
   132 
       
   133         for command, args, opts, finalfuture in self.calls:
       
   134             mtd = getattr(self._remote, command)
       
   135             batchable = mtd.batchable(mtd.__self__, *args, **opts)
       
   136 
       
   137             commandargs, fremote = next(batchable)
       
   138             assert fremote
       
   139             requests.append((command, commandargs))
       
   140             results.append((command, finalfuture, batchable, fremote))
       
   141 
       
   142         if requests:
       
   143             self._resultiter = self._remote._submitbatch(requests)
       
   144 
       
   145         self._results = results
       
   146 
       
   147     def results(self):
       
   148         for command, finalfuture, batchable, remotefuture in self._results:
       
   149             # Get the raw result, set it in the remote future, feed it
       
   150             # back into the @batchable generator so it can be decoded, and
       
   151             # set the result on the final future to this value.
       
   152             remoteresult = next(self._resultiter)
       
   153             remotefuture.set(remoteresult)
       
   154             finalfuture.set(next(batchable))
       
   155 
       
   156             # Verify our @batchable generators only emit 2 values.
       
   157             try:
       
   158                 next(batchable)
       
   159             except StopIteration:
       
   160                 pass
       
   161             else:
       
   162                 raise error.ProgrammingError('%s @batchable generator emitted '
       
   163                                              'unexpected value count' % command)
       
   164 
       
   165             yield finalfuture.value
       
   166 
       
   167 def encodebatchcmds(req):
    76 def encodebatchcmds(req):
   168     """Return a ``cmds`` argument value for the ``batch`` command."""
    77     """Return a ``cmds`` argument value for the ``batch`` command."""
   169     escapearg = wireprototypes.escapebatcharg
    78     escapearg = wireprototypes.escapebatcharg
   170 
    79 
   171     cmds = []
    80     cmds = []
   409     """
   318     """
   410     def commandexecutor(self):
   319     def commandexecutor(self):
   411         return peerexecutor(self)
   320         return peerexecutor(self)
   412 
   321 
   413     # Begin of ipeercommands interface.
   322     # Begin of ipeercommands interface.
   414 
       
   415     def iterbatch(self):
       
   416         return remoteiterbatcher(self)
       
   417 
   323 
   418     @batchable
   324     @batchable
   419     def lookup(self, key):
   325     def lookup(self, key):
   420         self.requirecap('lookup', _('look up remote revision'))
   326         self.requirecap('lookup', _('look up remote revision'))
   421         f = future()
   327         f = future()