comparison mercurial/wireprotov1peer.py @ 37631:2f626233859b

wireproto: implement batching on peer executor interface This is a bit more complicated than non-batch requests because we need to buffer sends until the last request arrives *and* we need to support resolving futures as data arrives from the remote. In a classical concurrent.futures executor model, the future "starts" as soon as it is submitted. However, we have nothing to start until the last command is submitted. If we did nothing, calling result() would deadlock, since the future hasn't "started." So in the case where we queue the command, we return a special future type whose result() will trigger sendcommands(). This eliminates the deadlock potential. It also serves as a check against callers who may be calling result() prematurely, as it will prevent any subsequent callcommands() from working. This behavior is slightly annoying and a bit restrictive. But it's the world that half duplex connections forces on us. In order to support streaming responses, we were previously using a generator. But with a futures-based API, we're using futures and not generators. So in order to get streaming, we need a background thread to read data from the server. The approach taken in this patch is to leverage the ThreadPoolExecutor from concurrent.futures for managing a background thread. We create an executor and future that resolves when all response data is processed (or an error occurs). When exiting the context manager, we wait on that background reading before returning. I was hoping we could manually spin up a threading.Thread and this would be simple. But I ran into a few deadlocks when implementing. After looking at the source code to concurrent.futures, I figured it would just be easier to use a ThreadPoolExecutor than implement all the code needed to manually manage a thread. To prove this works, a use of the batch API in discovery has been updated. Differential Revision: https://phab.mercurial-scm.org/D3269
author Gregory Szorc <gregory.szorc@gmail.com>
date Fri, 13 Apr 2018 11:02:34 -0700
parents e1b32dc4646c
children 33a6eee08db2
comparison
equal deleted inserted replaced
37630:e1b32dc4646c 37631:2f626233859b
7 7
8 from __future__ import absolute_import 8 from __future__ import absolute_import
9 9
10 import hashlib 10 import hashlib
11 import sys 11 import sys
12 import weakref
12 13
13 from .i18n import _ 14 from .i18n import _
14 from .node import ( 15 from .node import (
15 bin, 16 bin,
16 ) 17 )
178 for k, v in argsdict.iteritems()) 179 for k, v in argsdict.iteritems())
179 cmds.append('%s %s' % (op, args)) 180 cmds.append('%s %s' % (op, args))
180 181
181 return ';'.join(cmds) 182 return ';'.join(cmds)
182 183
184 class unsentfuture(pycompat.futures.Future):
185 """A Future variation to represent an unsent command.
186
187 Because we buffer commands and don't submit them immediately, calling
188 ``result()`` on an unsent future could deadlock. Futures for buffered
189 commands are represented by this type, which wraps ``result()`` to
190 call ``sendcommands()``.
191 """
192
193 def result(self, timeout=None):
194 if self.done():
195 return pycompat.futures.Future.result(self, timeout)
196
197 self._peerexecutor.sendcommands()
198
199 # This looks like it will infinitely recurse. However,
200 # sendcommands() should modify __class__. This call serves as a check
201 # on that.
202 return self.result(timeout)
203
183 @zi.implementer(repository.ipeercommandexecutor) 204 @zi.implementer(repository.ipeercommandexecutor)
184 class peerexecutor(object): 205 class peerexecutor(object):
185 def __init__(self, peer): 206 def __init__(self, peer):
186 self._peer = peer 207 self._peer = peer
187 self._sent = False 208 self._sent = False
188 self._closed = False 209 self._closed = False
189 self._calls = [] 210 self._calls = []
211 self._futures = weakref.WeakSet()
212 self._responseexecutor = None
213 self._responsef = None
190 214
191 def __enter__(self): 215 def __enter__(self):
192 return self 216 return self
193 217
194 def __exit__(self, exctype, excvalee, exctb): 218 def __exit__(self, exctype, excvalee, exctb):
212 'on peer' % command) 236 'on peer' % command)
213 237
214 # Commands are either batchable or they aren't. If a command 238 # Commands are either batchable or they aren't. If a command
215 # isn't batchable, we send it immediately because the executor 239 # isn't batchable, we send it immediately because the executor
216 # can no longer accept new commands after a non-batchable command. 240 # can no longer accept new commands after a non-batchable command.
217 # If a command is batchable, we queue it for later. 241 # If a command is batchable, we queue it for later. But we have
242 # to account for the case of a non-batchable command arriving after
243 # a batchable one and refuse to service it.
244
245 def addcall():
246 f = pycompat.futures.Future()
247 self._futures.add(f)
248 self._calls.append((command, args, fn, f))
249 return f
218 250
219 if getattr(fn, 'batchable', False): 251 if getattr(fn, 'batchable', False):
220 pass 252 f = addcall()
253
254 # But since we don't issue it immediately, we wrap its result()
255 # to trigger sending so we avoid deadlocks.
256 f.__class__ = unsentfuture
257 f._peerexecutor = self
221 else: 258 else:
222 if self._calls: 259 if self._calls:
223 raise error.ProgrammingError( 260 raise error.ProgrammingError(
224 '%s is not batchable and cannot be called on a command ' 261 '%s is not batchable and cannot be called on a command '
225 'executor along with other commands' % command) 262 'executor along with other commands' % command)
226 263
227 # We don't support batching yet. So resolve it immediately. 264 f = addcall()
228 f = pycompat.futures.Future() 265
229 self._calls.append((command, args, fn, f)) 266 # Non-batchable commands can never coexist with another command
230 self.sendcommands() 267 # in this executor. So send the command immediately.
268 self.sendcommands()
269
231 return f 270 return f
232 271
233 def sendcommands(self): 272 def sendcommands(self):
234 if self._sent: 273 if self._sent:
235 return 274 return
237 if not self._calls: 276 if not self._calls:
238 return 277 return
239 278
240 self._sent = True 279 self._sent = True
241 280
281 # Unhack any future types so caller seens a clean type and to break
282 # cycle between us and futures.
283 for f in self._futures:
284 if isinstance(f, unsentfuture):
285 f.__class__ = pycompat.futures.Future
286 f._peerexecutor = None
287
242 calls = self._calls 288 calls = self._calls
243 # Mainly to destroy references to futures. 289 # Mainly to destroy references to futures.
244 self._calls = None 290 self._calls = None
245 291
292 # Simple case of a single command. We call it synchronously.
246 if len(calls) == 1: 293 if len(calls) == 1:
247 command, args, fn, f = calls[0] 294 command, args, fn, f = calls[0]
248 295
249 # Future was cancelled. Ignore it. 296 # Future was cancelled. Ignore it.
250 if not f.set_running_or_notify_cancel(): 297 if not f.set_running_or_notify_cancel():
257 else: 304 else:
258 f.set_result(result) 305 f.set_result(result)
259 306
260 return 307 return
261 308
262 raise error.ProgrammingError('support for multiple commands not ' 309 # Batch commands are a bit harder. First, we have to deal with the
263 'yet implemented') 310 # @batchable coroutine. That's a bit annoying. Furthermore, we also
311 # need to preserve streaming. i.e. it should be possible for the
312 # futures to resolve as data is coming in off the wire without having
313 # to wait for the final byte of the final response. We do this by
314 # spinning up a thread to read the responses.
315
316 requests = []
317 states = []
318
319 for command, args, fn, f in calls:
320 # Future was cancelled. Ignore it.
321 if not f.set_running_or_notify_cancel():
322 continue
323
324 try:
325 batchable = fn.batchable(fn.__self__,
326 **pycompat.strkwargs(args))
327 except Exception:
328 f.set_exception_info(*sys.exc_info()[1:])
329 return
330
331 # Encoded arguments and future holding remote result.
332 try:
333 encodedargs, fremote = next(batchable)
334 except Exception:
335 f.set_exception_info(*sys.exc_info()[1:])
336 return
337
338 requests.append((command, encodedargs))
339 states.append((command, f, batchable, fremote))
340
341 if not requests:
342 return
343
344 # This will emit responses in order they were executed.
345 wireresults = self._peer._submitbatch(requests)
346
347 # The use of a thread pool executor here is a bit weird for something
348 # that only spins up a single thread. However, thread management is
349 # hard and it is easy to encounter race conditions, deadlocks, etc.
350 # concurrent.futures already solves these problems and its thread pool
351 # executor has minimal overhead. So we use it.
352 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
353 self._responsef = self._responseexecutor.submit(self._readbatchresponse,
354 states, wireresults)
264 355
265 def close(self): 356 def close(self):
266 self.sendcommands() 357 self.sendcommands()
267 358
359 if self._closed:
360 return
361
268 self._closed = True 362 self._closed = True
363
364 if not self._responsef:
365 return
366
367 # We need to wait on our in-flight response and then shut down the
368 # executor once we have a result.
369 try:
370 self._responsef.result()
371 finally:
372 self._responseexecutor.shutdown(wait=True)
373 self._responsef = None
374 self._responseexecutor = None
375
376 # If any of our futures are still in progress, mark them as
377 # errored. Otherwise a result() could wait indefinitely.
378 for f in self._futures:
379 if not f.done():
380 f.set_exception(error.ResponseError(
381 _('unfulfilled batch command response')))
382
383 self._futures = None
384
385 def _readbatchresponse(self, states, wireresults):
386 # Executes in a thread to read data off the wire.
387
388 for command, f, batchable, fremote in states:
389 # Grab raw result off the wire and teach the internal future
390 # about it.
391 remoteresult = next(wireresults)
392 fremote.set(remoteresult)
393
394 # And ask the coroutine to decode that value.
395 try:
396 result = next(batchable)
397 except Exception:
398 f.set_exception_info(*sys.exc_info()[1:])
399 else:
400 f.set_result(result)
269 401
270 class wirepeer(repository.legacypeer): 402 class wirepeer(repository.legacypeer):
271 """Client-side interface for communicating with a peer repository. 403 """Client-side interface for communicating with a peer repository.
272 404
273 Methods commonly call wire protocol commands of the same name. 405 Methods commonly call wire protocol commands of the same name.