Mercurial > hg
comparison mercurial/wireprotov1peer.py @ 37631:2f626233859b
wireproto: implement batching on peer executor interface
This is a bit more complicated than non-batch requests because we
need to buffer sends until the last request arrives *and* we need
to support resolving futures as data arrives from the remote.
In a classical concurrent.futures executor model, the future
"starts" as soon as it is submitted. However, we have nothing to
start until the last command is submitted.
If we did nothing, calling result() would deadlock, since the future
hasn't "started." So in the case where we queue the command, we return
a special future type whose result() will trigger sendcommands().
This eliminates the deadlock potential. It also serves as a check
against callers who may be calling result() prematurely, as it will
prevent any subsequent callcommands() from working. This behavior
is slightly annoying and a bit restrictive. But it's the world
that half duplex connections forces on us.
In order to support streaming responses, we were previously using
a generator. But with a futures-based API, we're using futures
and not generators. So in order to get streaming, we need a
background thread to read data from the server.
The approach taken in this patch is to leverage the ThreadPoolExecutor
from concurrent.futures for managing a background thread. We create
an executor and future that resolves when all response data is
processed (or an error occurs). When exiting the context manager,
we wait on that background reading before returning.
I was hoping we could manually spin up a threading.Thread and this
would be simple. But I ran into a few deadlocks when implementing.
After looking at the source code to concurrent.futures, I figured
it would just be easier to use a ThreadPoolExecutor than implement
all the code needed to manually manage a thread.
To prove this works, a use of the batch API in discovery has been
updated.
Differential Revision: https://phab.mercurial-scm.org/D3269
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Fri, 13 Apr 2018 11:02:34 -0700 |
parents | e1b32dc4646c |
children | 33a6eee08db2 |
comparison
equal
deleted
inserted
replaced
37630:e1b32dc4646c | 37631:2f626233859b |
---|---|
7 | 7 |
8 from __future__ import absolute_import | 8 from __future__ import absolute_import |
9 | 9 |
10 import hashlib | 10 import hashlib |
11 import sys | 11 import sys |
12 import weakref | |
12 | 13 |
13 from .i18n import _ | 14 from .i18n import _ |
14 from .node import ( | 15 from .node import ( |
15 bin, | 16 bin, |
16 ) | 17 ) |
178 for k, v in argsdict.iteritems()) | 179 for k, v in argsdict.iteritems()) |
179 cmds.append('%s %s' % (op, args)) | 180 cmds.append('%s %s' % (op, args)) |
180 | 181 |
181 return ';'.join(cmds) | 182 return ';'.join(cmds) |
182 | 183 |
184 class unsentfuture(pycompat.futures.Future): | |
185 """A Future variation to represent an unsent command. | |
186 | |
187 Because we buffer commands and don't submit them immediately, calling | |
188 ``result()`` on an unsent future could deadlock. Futures for buffered | |
189 commands are represented by this type, which wraps ``result()`` to | |
190 call ``sendcommands()``. | |
191 """ | |
192 | |
193 def result(self, timeout=None): | |
194 if self.done(): | |
195 return pycompat.futures.Future.result(self, timeout) | |
196 | |
197 self._peerexecutor.sendcommands() | |
198 | |
199 # This looks like it will infinitely recurse. However, | |
200 # sendcommands() should modify __class__. This call serves as a check | |
201 # on that. | |
202 return self.result(timeout) | |
203 | |
183 @zi.implementer(repository.ipeercommandexecutor) | 204 @zi.implementer(repository.ipeercommandexecutor) |
184 class peerexecutor(object): | 205 class peerexecutor(object): |
185 def __init__(self, peer): | 206 def __init__(self, peer): |
186 self._peer = peer | 207 self._peer = peer |
187 self._sent = False | 208 self._sent = False |
188 self._closed = False | 209 self._closed = False |
189 self._calls = [] | 210 self._calls = [] |
211 self._futures = weakref.WeakSet() | |
212 self._responseexecutor = None | |
213 self._responsef = None | |
190 | 214 |
191 def __enter__(self): | 215 def __enter__(self): |
192 return self | 216 return self |
193 | 217 |
194 def __exit__(self, exctype, excvalee, exctb): | 218 def __exit__(self, exctype, excvalee, exctb): |
212 'on peer' % command) | 236 'on peer' % command) |
213 | 237 |
214 # Commands are either batchable or they aren't. If a command | 238 # Commands are either batchable or they aren't. If a command |
215 # isn't batchable, we send it immediately because the executor | 239 # isn't batchable, we send it immediately because the executor |
216 # can no longer accept new commands after a non-batchable command. | 240 # can no longer accept new commands after a non-batchable command. |
217 # If a command is batchable, we queue it for later. | 241 # If a command is batchable, we queue it for later. But we have |
242 # to account for the case of a non-batchable command arriving after | |
243 # a batchable one and refuse to service it. | |
244 | |
245 def addcall(): | |
246 f = pycompat.futures.Future() | |
247 self._futures.add(f) | |
248 self._calls.append((command, args, fn, f)) | |
249 return f | |
218 | 250 |
219 if getattr(fn, 'batchable', False): | 251 if getattr(fn, 'batchable', False): |
220 pass | 252 f = addcall() |
253 | |
254 # But since we don't issue it immediately, we wrap its result() | |
255 # to trigger sending so we avoid deadlocks. | |
256 f.__class__ = unsentfuture | |
257 f._peerexecutor = self | |
221 else: | 258 else: |
222 if self._calls: | 259 if self._calls: |
223 raise error.ProgrammingError( | 260 raise error.ProgrammingError( |
224 '%s is not batchable and cannot be called on a command ' | 261 '%s is not batchable and cannot be called on a command ' |
225 'executor along with other commands' % command) | 262 'executor along with other commands' % command) |
226 | 263 |
227 # We don't support batching yet. So resolve it immediately. | 264 f = addcall() |
228 f = pycompat.futures.Future() | 265 |
229 self._calls.append((command, args, fn, f)) | 266 # Non-batchable commands can never coexist with another command |
230 self.sendcommands() | 267 # in this executor. So send the command immediately. |
268 self.sendcommands() | |
269 | |
231 return f | 270 return f |
232 | 271 |
233 def sendcommands(self): | 272 def sendcommands(self): |
234 if self._sent: | 273 if self._sent: |
235 return | 274 return |
237 if not self._calls: | 276 if not self._calls: |
238 return | 277 return |
239 | 278 |
240 self._sent = True | 279 self._sent = True |
241 | 280 |
281 # Unhack any future types so caller seens a clean type and to break | |
282 # cycle between us and futures. | |
283 for f in self._futures: | |
284 if isinstance(f, unsentfuture): | |
285 f.__class__ = pycompat.futures.Future | |
286 f._peerexecutor = None | |
287 | |
242 calls = self._calls | 288 calls = self._calls |
243 # Mainly to destroy references to futures. | 289 # Mainly to destroy references to futures. |
244 self._calls = None | 290 self._calls = None |
245 | 291 |
292 # Simple case of a single command. We call it synchronously. | |
246 if len(calls) == 1: | 293 if len(calls) == 1: |
247 command, args, fn, f = calls[0] | 294 command, args, fn, f = calls[0] |
248 | 295 |
249 # Future was cancelled. Ignore it. | 296 # Future was cancelled. Ignore it. |
250 if not f.set_running_or_notify_cancel(): | 297 if not f.set_running_or_notify_cancel(): |
257 else: | 304 else: |
258 f.set_result(result) | 305 f.set_result(result) |
259 | 306 |
260 return | 307 return |
261 | 308 |
262 raise error.ProgrammingError('support for multiple commands not ' | 309 # Batch commands are a bit harder. First, we have to deal with the |
263 'yet implemented') | 310 # @batchable coroutine. That's a bit annoying. Furthermore, we also |
311 # need to preserve streaming. i.e. it should be possible for the | |
312 # futures to resolve as data is coming in off the wire without having | |
313 # to wait for the final byte of the final response. We do this by | |
314 # spinning up a thread to read the responses. | |
315 | |
316 requests = [] | |
317 states = [] | |
318 | |
319 for command, args, fn, f in calls: | |
320 # Future was cancelled. Ignore it. | |
321 if not f.set_running_or_notify_cancel(): | |
322 continue | |
323 | |
324 try: | |
325 batchable = fn.batchable(fn.__self__, | |
326 **pycompat.strkwargs(args)) | |
327 except Exception: | |
328 f.set_exception_info(*sys.exc_info()[1:]) | |
329 return | |
330 | |
331 # Encoded arguments and future holding remote result. | |
332 try: | |
333 encodedargs, fremote = next(batchable) | |
334 except Exception: | |
335 f.set_exception_info(*sys.exc_info()[1:]) | |
336 return | |
337 | |
338 requests.append((command, encodedargs)) | |
339 states.append((command, f, batchable, fremote)) | |
340 | |
341 if not requests: | |
342 return | |
343 | |
344 # This will emit responses in order they were executed. | |
345 wireresults = self._peer._submitbatch(requests) | |
346 | |
347 # The use of a thread pool executor here is a bit weird for something | |
348 # that only spins up a single thread. However, thread management is | |
349 # hard and it is easy to encounter race conditions, deadlocks, etc. | |
350 # concurrent.futures already solves these problems and its thread pool | |
351 # executor has minimal overhead. So we use it. | |
352 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) | |
353 self._responsef = self._responseexecutor.submit(self._readbatchresponse, | |
354 states, wireresults) | |
264 | 355 |
265 def close(self): | 356 def close(self): |
266 self.sendcommands() | 357 self.sendcommands() |
267 | 358 |
359 if self._closed: | |
360 return | |
361 | |
268 self._closed = True | 362 self._closed = True |
363 | |
364 if not self._responsef: | |
365 return | |
366 | |
367 # We need to wait on our in-flight response and then shut down the | |
368 # executor once we have a result. | |
369 try: | |
370 self._responsef.result() | |
371 finally: | |
372 self._responseexecutor.shutdown(wait=True) | |
373 self._responsef = None | |
374 self._responseexecutor = None | |
375 | |
376 # If any of our futures are still in progress, mark them as | |
377 # errored. Otherwise a result() could wait indefinitely. | |
378 for f in self._futures: | |
379 if not f.done(): | |
380 f.set_exception(error.ResponseError( | |
381 _('unfulfilled batch command response'))) | |
382 | |
383 self._futures = None | |
384 | |
385 def _readbatchresponse(self, states, wireresults): | |
386 # Executes in a thread to read data off the wire. | |
387 | |
388 for command, f, batchable, fremote in states: | |
389 # Grab raw result off the wire and teach the internal future | |
390 # about it. | |
391 remoteresult = next(wireresults) | |
392 fremote.set(remoteresult) | |
393 | |
394 # And ask the coroutine to decode that value. | |
395 try: | |
396 result = next(batchable) | |
397 except Exception: | |
398 f.set_exception_info(*sys.exc_info()[1:]) | |
399 else: | |
400 f.set_result(result) | |
269 | 401 |
270 class wirepeer(repository.legacypeer): | 402 class wirepeer(repository.legacypeer): |
271 """Client-side interface for communicating with a peer repository. | 403 """Client-side interface for communicating with a peer repository. |
272 | 404 |
273 Methods commonly call wire protocol commands of the same name. | 405 Methods commonly call wire protocol commands of the same name. |