comparison mercurial/wireproto.py @ 28438:48fd02dac1d4

wireproto: make iterbatcher behave streamily over http(s) Unfortunately, the ssh and http implementations are slightly different due to differences in their _callstream implementations, which prevents ssh from behaving streamily. We should probably introduce a new batch command that can stream results over ssh at some point in the near future. The streamy behavior of batch over http(s) is an enormous win for remotefilelog over http: in my testing, it's saving about 40% on file fetches with a cold cache against a server on localhost.
author Augie Fackler <augie@google.com>
date Tue, 01 Mar 2016 18:41:43 -0500
parents 8d38eab2777a
children fd2acc5046f6
comparison
equal deleted inserted replaced
28437:c3eacee01c7e 28438:48fd02dac1d4
5 # This software may be used and distributed according to the terms of the 5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version. 6 # GNU General Public License version 2 or any later version.
7 7
8 from __future__ import absolute_import 8 from __future__ import absolute_import
9 9
10 import itertools
10 import os 11 import os
11 import sys 12 import sys
12 import tempfile 13 import tempfile
13 import urllib 14 import urllib
14 15
117 class remoteiterbatcher(peer.iterbatcher): 118 class remoteiterbatcher(peer.iterbatcher):
118 def __init__(self, remote): 119 def __init__(self, remote):
119 super(remoteiterbatcher, self).__init__() 120 super(remoteiterbatcher, self).__init__()
120 self._remote = remote 121 self._remote = remote
121 122
123 def __getattr__(self, name):
124 if not getattr(self._remote, name, False):
125 raise AttributeError(
126 'Attempted to iterbatch non-batchable call to %r' % name)
127 return super(remoteiterbatcher, self).__getattr__(name)
128
122 def submit(self): 129 def submit(self):
123 """Break the batch request into many patch calls and pipeline them. 130 """Break the batch request into many patch calls and pipeline them.
124 131
125 This is mostly valuable over http where request sizes can be 132 This is mostly valuable over http where request sizes can be
126 limited, but can be used in other places as well. 133 limited, but can be used in other places as well.
127 """ 134 """
128 rb = self._remote.batch() 135 req, rsp = [], []
129 rb.calls = self.calls 136 for name, args, opts, resref in self.calls:
130 rb.submit() 137 mtd = getattr(self._remote, name)
138 batchable = mtd.batchable(mtd.im_self, *args, **opts)
139 encargsorres, encresref = batchable.next()
140 assert encresref
141 req.append((name, encargsorres))
142 rsp.append((batchable, encresref))
143 if req:
144 self._resultiter = self._remote._submitbatch(req)
145 self._rsp = rsp
131 146
132 def results(self): 147 def results(self):
133 for name, args, opts, resref in self.calls: 148 for (batchable, encresref), encres in itertools.izip(
134 yield resref.value 149 self._rsp, self._resultiter):
150 encresref.set(encres)
151 yield batchable.next()
135 152
136 # Forward a couple of names from peer to make wireproto interactions 153 # Forward a couple of names from peer to make wireproto interactions
137 # slightly more sensible. 154 # slightly more sensible.
138 batchable = peer.batchable 155 batchable = peer.batchable
139 future = peer.future 156 future = peer.future
200 if self.capable('batch'): 217 if self.capable('batch'):
201 return remotebatch(self) 218 return remotebatch(self)
202 else: 219 else:
203 return peer.localbatch(self) 220 return peer.localbatch(self)
204 def _submitbatch(self, req): 221 def _submitbatch(self, req):
222 """run batch request <req> on the server
223
224 Returns an iterator of the raw responses from the server.
225 """
205 cmds = [] 226 cmds = []
206 for op, argsdict in req: 227 for op, argsdict in req:
207 args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) 228 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
208 for k, v in argsdict.iteritems()) 229 for k, v in argsdict.iteritems())
209 cmds.append('%s %s' % (op, args)) 230 cmds.append('%s %s' % (op, args))
210 rsp = self._call("batch", cmds=';'.join(cmds)) 231 rsp = self._callstream("batch", cmds=';'.join(cmds))
211 return [unescapearg(r) for r in rsp.split(';')] 232 # TODO this response parsing is probably suboptimal for large
233 # batches with large responses.
234 work = rsp.read(1024)
235 chunk = work
236 while chunk:
237 while ';' in work:
238 one, work = work.split(';', 1)
239 yield unescapearg(one)
240 chunk = rsp.read(1024)
241 work += chunk
242 yield unescapearg(work)
243
212 def _submitone(self, op, args): 244 def _submitone(self, op, args):
213 return self._call(op, **args) 245 return self._call(op, **args)
214 246
215 def iterbatch(self): 247 def iterbatch(self):
216 return remoteiterbatcher(self) 248 return remoteiterbatcher(self)