comparison mercurial/wireprotov1peer.py @ 43076:2372284d9457

formatting: blacken the codebase This is using my patch to black (https://github.com/psf/black/pull/826) so we don't un-wrap collection literals. Done with: hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S # skip-blame mass-reformatting only # no-check-commit reformats foo_bar functions Differential Revision: https://phab.mercurial-scm.org/D6971
author Augie Fackler <augie@google.com>
date Sun, 06 Oct 2019 09:45:02 -0400
parents 2c4f656c8e9f
children 687b865b95ad
comparison
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
10 import hashlib 10 import hashlib
11 import sys 11 import sys
12 import weakref 12 import weakref
13 13
14 from .i18n import _ 14 from .i18n import _
15 from .node import ( 15 from .node import bin
16 bin,
17 )
18 from . import ( 16 from . import (
19 bundle2, 17 bundle2,
20 changegroup as changegroupmod, 18 changegroup as changegroupmod,
21 encoding, 19 encoding,
22 error, 20 error,
30 util as interfaceutil, 28 util as interfaceutil,
31 ) 29 )
32 30
33 urlreq = util.urlreq 31 urlreq = util.urlreq
34 32
33
35 def batchable(f): 34 def batchable(f):
36 '''annotation for batchable methods 35 '''annotation for batchable methods
37 36
38 Such methods must implement a coroutine as follows: 37 Such methods must implement a coroutine as follows:
39 38
52 The decorator returns a function which wraps this coroutine as a plain 51 The decorator returns a function which wraps this coroutine as a plain
53 method, but adds the original method as an attribute called "batchable", 52 method, but adds the original method as an attribute called "batchable",
54 which is used by remotebatch to split the call into separate encoding and 53 which is used by remotebatch to split the call into separate encoding and
55 decoding phases. 54 decoding phases.
56 ''' 55 '''
56
57 def plain(*args, **opts): 57 def plain(*args, **opts):
58 batchable = f(*args, **opts) 58 batchable = f(*args, **opts)
59 encargsorres, encresref = next(batchable) 59 encargsorres, encresref = next(batchable)
60 if not encresref: 60 if not encresref:
61 return encargsorres # a local result in this case 61 return encargsorres # a local result in this case
62 self = args[0] 62 self = args[0]
63 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr 63 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
64 encresref.set(self._submitone(cmd, encargsorres)) 64 encresref.set(self._submitone(cmd, encargsorres))
65 return next(batchable) 65 return next(batchable)
66
66 setattr(plain, 'batchable', f) 67 setattr(plain, 'batchable', f)
67 setattr(plain, '__name__', f.__name__) 68 setattr(plain, '__name__', f.__name__)
68 return plain 69 return plain
69 70
71
70 class future(object): 72 class future(object):
71 '''placeholder for a value to be set later''' 73 '''placeholder for a value to be set later'''
74
72 def set(self, value): 75 def set(self, value):
73 if util.safehasattr(self, 'value'): 76 if util.safehasattr(self, 'value'):
74 raise error.RepoError("future is already set") 77 raise error.RepoError("future is already set")
75 self.value = value 78 self.value = value
79
76 80
77 def encodebatchcmds(req): 81 def encodebatchcmds(req):
78 """Return a ``cmds`` argument value for the ``batch`` command.""" 82 """Return a ``cmds`` argument value for the ``batch`` command."""
79 escapearg = wireprototypes.escapebatcharg 83 escapearg = wireprototypes.escapebatcharg
80 84
83 # Old servers didn't properly unescape argument names. So prevent 87 # Old servers didn't properly unescape argument names. So prevent
84 # the sending of argument names that may not be decoded properly by 88 # the sending of argument names that may not be decoded properly by
85 # servers. 89 # servers.
86 assert all(escapearg(k) == k for k in argsdict) 90 assert all(escapearg(k) == k for k in argsdict)
87 91
88 args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) 92 args = ','.join(
89 for k, v in argsdict.iteritems()) 93 '%s=%s' % (escapearg(k), escapearg(v))
94 for k, v in argsdict.iteritems()
95 )
90 cmds.append('%s %s' % (op, args)) 96 cmds.append('%s %s' % (op, args))
91 97
92 return ';'.join(cmds) 98 return ';'.join(cmds)
99
93 100
94 class unsentfuture(pycompat.futures.Future): 101 class unsentfuture(pycompat.futures.Future):
95 """A Future variation to represent an unsent command. 102 """A Future variation to represent an unsent command.
96 103
97 Because we buffer commands and don't submit them immediately, calling 104 Because we buffer commands and don't submit them immediately, calling
108 115
109 # This looks like it will infinitely recurse. However, 116 # This looks like it will infinitely recurse. However,
110 # sendcommands() should modify __class__. This call serves as a check 117 # sendcommands() should modify __class__. This call serves as a check
111 # on that. 118 # on that.
112 return self.result(timeout) 119 return self.result(timeout)
120
113 121
114 @interfaceutil.implementer(repository.ipeercommandexecutor) 122 @interfaceutil.implementer(repository.ipeercommandexecutor)
115 class peerexecutor(object): 123 class peerexecutor(object):
116 def __init__(self, peer): 124 def __init__(self, peer):
117 self._peer = peer 125 self._peer = peer
128 def __exit__(self, exctype, excvalee, exctb): 136 def __exit__(self, exctype, excvalee, exctb):
129 self.close() 137 self.close()
130 138
131 def callcommand(self, command, args): 139 def callcommand(self, command, args):
132 if self._sent: 140 if self._sent:
133 raise error.ProgrammingError('callcommand() cannot be used ' 141 raise error.ProgrammingError(
134 'after commands are sent') 142 'callcommand() cannot be used ' 'after commands are sent'
143 )
135 144
136 if self._closed: 145 if self._closed:
137 raise error.ProgrammingError('callcommand() cannot be used ' 146 raise error.ProgrammingError(
138 'after close()') 147 'callcommand() cannot be used ' 'after close()'
148 )
139 149
140 # Commands are dispatched through methods on the peer. 150 # Commands are dispatched through methods on the peer.
141 fn = getattr(self._peer, pycompat.sysstr(command), None) 151 fn = getattr(self._peer, pycompat.sysstr(command), None)
142 152
143 if not fn: 153 if not fn:
144 raise error.ProgrammingError( 154 raise error.ProgrammingError(
145 'cannot call command %s: method of same name not available ' 155 'cannot call command %s: method of same name not available '
146 'on peer' % command) 156 'on peer' % command
157 )
147 158
148 # Commands are either batchable or they aren't. If a command 159 # Commands are either batchable or they aren't. If a command
149 # isn't batchable, we send it immediately because the executor 160 # isn't batchable, we send it immediately because the executor
150 # can no longer accept new commands after a non-batchable command. 161 # can no longer accept new commands after a non-batchable command.
151 # If a command is batchable, we queue it for later. But we have 162 # If a command is batchable, we queue it for later. But we have
167 f._peerexecutor = self 178 f._peerexecutor = self
168 else: 179 else:
169 if self._calls: 180 if self._calls:
170 raise error.ProgrammingError( 181 raise error.ProgrammingError(
171 '%s is not batchable and cannot be called on a command ' 182 '%s is not batchable and cannot be called on a command '
172 'executor along with other commands' % command) 183 'executor along with other commands' % command
184 )
173 185
174 f = addcall() 186 f = addcall()
175 187
176 # Non-batchable commands can never coexist with another command 188 # Non-batchable commands can never coexist with another command
177 # in this executor. So send the command immediately. 189 # in this executor. So send the command immediately.
230 # Future was cancelled. Ignore it. 242 # Future was cancelled. Ignore it.
231 if not f.set_running_or_notify_cancel(): 243 if not f.set_running_or_notify_cancel():
232 continue 244 continue
233 245
234 try: 246 try:
235 batchable = fn.batchable(fn.__self__, 247 batchable = fn.batchable(
236 **pycompat.strkwargs(args)) 248 fn.__self__, **pycompat.strkwargs(args)
249 )
237 except Exception: 250 except Exception:
238 pycompat.future_set_exception_info(f, sys.exc_info()[1:]) 251 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
239 return 252 return
240 253
241 # Encoded arguments and future holding remote result. 254 # Encoded arguments and future holding remote result.
261 # that only spins up a single thread. However, thread management is 274 # that only spins up a single thread. However, thread management is
262 # hard and it is easy to encounter race conditions, deadlocks, etc. 275 # hard and it is easy to encounter race conditions, deadlocks, etc.
263 # concurrent.futures already solves these problems and its thread pool 276 # concurrent.futures already solves these problems and its thread pool
264 # executor has minimal overhead. So we use it. 277 # executor has minimal overhead. So we use it.
265 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) 278 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
266 self._responsef = self._responseexecutor.submit(self._readbatchresponse, 279 self._responsef = self._responseexecutor.submit(
267 states, wireresults) 280 self._readbatchresponse, states, wireresults
281 )
268 282
269 def close(self): 283 def close(self):
270 self.sendcommands() 284 self.sendcommands()
271 285
272 if self._closed: 286 if self._closed:
288 302
289 # If any of our futures are still in progress, mark them as 303 # If any of our futures are still in progress, mark them as
290 # errored. Otherwise a result() could wait indefinitely. 304 # errored. Otherwise a result() could wait indefinitely.
291 for f in self._futures: 305 for f in self._futures:
292 if not f.done(): 306 if not f.done():
293 f.set_exception(error.ResponseError( 307 f.set_exception(
294 _('unfulfilled batch command response'))) 308 error.ResponseError(
309 _('unfulfilled batch command response')
310 )
311 )
295 312
296 self._futures = None 313 self._futures = None
297 314
298 def _readbatchresponse(self, states, wireresults): 315 def _readbatchresponse(self, states, wireresults):
299 # Executes in a thread to read data off the wire. 316 # Executes in a thread to read data off the wire.
310 except Exception: 327 except Exception:
311 pycompat.future_set_exception_info(f, sys.exc_info()[1:]) 328 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
312 else: 329 else:
313 f.set_result(result) 330 f.set_result(result)
314 331
315 @interfaceutil.implementer(repository.ipeercommands, 332
316 repository.ipeerlegacycommands) 333 @interfaceutil.implementer(
334 repository.ipeercommands, repository.ipeerlegacycommands
335 )
317 class wirepeer(repository.peer): 336 class wirepeer(repository.peer):
318 """Client-side interface for communicating with a peer repository. 337 """Client-side interface for communicating with a peer repository.
319 338
320 Methods commonly call wire protocol commands of the same name. 339 Methods commonly call wire protocol commands of the same name.
321 340
322 See also httppeer.py and sshpeer.py for protocol-specific 341 See also httppeer.py and sshpeer.py for protocol-specific
323 implementations of this interface. 342 implementations of this interface.
324 """ 343 """
344
325 def commandexecutor(self): 345 def commandexecutor(self):
326 return peerexecutor(self) 346 return peerexecutor(self)
327 347
328 # Begin of ipeercommands interface. 348 # Begin of ipeercommands interface.
329 349
385 yield {}, None 405 yield {}, None
386 f = future() 406 f = future()
387 self.ui.debug('preparing listkeys for "%s"\n' % namespace) 407 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
388 yield {'namespace': encoding.fromlocal(namespace)}, f 408 yield {'namespace': encoding.fromlocal(namespace)}, f
389 d = f.value 409 d = f.value
390 self.ui.debug('received listkey for "%s": %i bytes\n' 410 self.ui.debug(
391 % (namespace, len(d))) 411 'received listkey for "%s": %i bytes\n' % (namespace, len(d))
412 )
392 yield pushkeymod.decodekeys(d) 413 yield pushkeymod.decodekeys(d)
393 414
394 @batchable 415 @batchable
395 def pushkey(self, namespace, key, old, new): 416 def pushkey(self, namespace, key, old, new):
396 if not self.capable('pushkey'): 417 if not self.capable('pushkey'):
397 yield False, None 418 yield False, None
398 f = future() 419 f = future()
399 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key)) 420 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
400 yield {'namespace': encoding.fromlocal(namespace), 421 yield {
401 'key': encoding.fromlocal(key), 422 'namespace': encoding.fromlocal(namespace),
402 'old': encoding.fromlocal(old), 423 'key': encoding.fromlocal(key),
403 'new': encoding.fromlocal(new)}, f 424 'old': encoding.fromlocal(old),
425 'new': encoding.fromlocal(new),
426 }, f
404 d = f.value 427 d = f.value
405 d, output = d.split('\n', 1) 428 d, output = d.split('\n', 1)
406 try: 429 try:
407 d = bool(int(d)) 430 d = bool(int(d))
408 except ValueError: 431 except ValueError:
409 raise error.ResponseError( 432 raise error.ResponseError(
410 _('push failed (unexpected response):'), d) 433 _('push failed (unexpected response):'), d
434 )
411 for l in output.splitlines(True): 435 for l in output.splitlines(True):
412 self.ui.status(_('remote: '), l) 436 self.ui.status(_('remote: '), l)
413 yield d 437 yield d
414 438
415 def stream_out(self): 439 def stream_out(self):
424 if value is None: 448 if value is None:
425 continue 449 continue
426 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) 450 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
427 if keytype is None: 451 if keytype is None:
428 raise error.ProgrammingError( 452 raise error.ProgrammingError(
429 'Unexpectedly None keytype for key %s' % key) 453 'Unexpectedly None keytype for key %s' % key
454 )
430 elif keytype == 'nodes': 455 elif keytype == 'nodes':
431 value = wireprototypes.encodelist(value) 456 value = wireprototypes.encodelist(value)
432 elif keytype == 'csv': 457 elif keytype == 'csv':
433 value = ','.join(value) 458 value = ','.join(value)
434 elif keytype == 'scsv': 459 elif keytype == 'scsv':
435 value = ','.join(sorted(value)) 460 value = ','.join(sorted(value))
436 elif keytype == 'boolean': 461 elif keytype == 'boolean':
437 value = '%i' % bool(value) 462 value = '%i' % bool(value)
438 elif keytype != 'plain': 463 elif keytype != 'plain':
439 raise KeyError('unknown getbundle option type %s' 464 raise KeyError('unknown getbundle option type %s' % keytype)
440 % keytype)
441 opts[key] = value 465 opts[key] = value
442 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts)) 466 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
443 if any((cap.startswith('HG2') for cap in bundlecaps)): 467 if any((cap.startswith('HG2') for cap in bundlecaps)):
444 return bundle2.getunbundler(self.ui, f) 468 return bundle2.getunbundler(self.ui, f)
445 else: 469 else:
459 visible to hooks. 483 visible to hooks.
460 ''' 484 '''
461 485
462 if heads != ['force'] and self.capable('unbundlehash'): 486 if heads != ['force'] and self.capable('unbundlehash'):
463 heads = wireprototypes.encodelist( 487 heads = wireprototypes.encodelist(
464 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) 488 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]
489 )
465 else: 490 else:
466 heads = wireprototypes.encodelist(heads) 491 heads = wireprototypes.encodelist(heads)
467 492
468 if util.safehasattr(bundle, 'deltaheader'): 493 if util.safehasattr(bundle, 'deltaheader'):
469 # this a bundle10, do the old style call sequence 494 # this a bundle10, do the old style call sequence
470 ret, output = self._callpush("unbundle", bundle, heads=heads) 495 ret, output = self._callpush("unbundle", bundle, heads=heads)
471 if ret == "": 496 if ret == "":
472 raise error.ResponseError( 497 raise error.ResponseError(_('push failed:'), output)
473 _('push failed:'), output)
474 try: 498 try:
475 ret = int(ret) 499 ret = int(ret)
476 except ValueError: 500 except ValueError:
477 raise error.ResponseError( 501 raise error.ResponseError(
478 _('push failed (unexpected response):'), ret) 502 _('push failed (unexpected response):'), ret
503 )
479 504
480 for l in output.splitlines(True): 505 for l in output.splitlines(True):
481 self.ui.status(_('remote: '), l) 506 self.ui.status(_('remote: '), l)
482 else: 507 else:
483 # bundle2 push. Send a stream, fetch a stream. 508 # bundle2 push. Send a stream, fetch a stream.
497 return br 522 return br
498 except ValueError: 523 except ValueError:
499 self._abort(error.ResponseError(_("unexpected response:"), d)) 524 self._abort(error.ResponseError(_("unexpected response:"), d))
500 525
501 def between(self, pairs): 526 def between(self, pairs):
502 batch = 8 # avoid giant requests 527 batch = 8 # avoid giant requests
503 r = [] 528 r = []
504 for i in pycompat.xrange(0, len(pairs), batch): 529 for i in pycompat.xrange(0, len(pairs), batch):
505 n = " ".join([wireprototypes.encodelist(p, '-') 530 n = " ".join(
506 for p in pairs[i:i + batch]]) 531 [
532 wireprototypes.encodelist(p, '-')
533 for p in pairs[i : i + batch]
534 ]
535 )
507 d = self._call("between", pairs=n) 536 d = self._call("between", pairs=n)
508 try: 537 try:
509 r.extend(l and wireprototypes.decodelist(l) or [] 538 r.extend(
510 for l in d.splitlines()) 539 l and wireprototypes.decodelist(l) or []
540 for l in d.splitlines()
541 )
511 except ValueError: 542 except ValueError:
512 self._abort(error.ResponseError(_("unexpected response:"), d)) 543 self._abort(error.ResponseError(_("unexpected response:"), d))
513 return r 544 return r
514 545
515 def changegroup(self, nodes, source): 546 def changegroup(self, nodes, source):
519 550
520 def changegroupsubset(self, bases, heads, source): 551 def changegroupsubset(self, bases, heads, source):
521 self.requirecap('changegroupsubset', _('look up remote changes')) 552 self.requirecap('changegroupsubset', _('look up remote changes'))
522 bases = wireprototypes.encodelist(bases) 553 bases = wireprototypes.encodelist(bases)
523 heads = wireprototypes.encodelist(heads) 554 heads = wireprototypes.encodelist(heads)
524 f = self._callcompressable("changegroupsubset", 555 f = self._callcompressable(
525 bases=bases, heads=heads) 556 "changegroupsubset", bases=bases, heads=heads
557 )
526 return changegroupmod.cg1unpacker(f, 'UN') 558 return changegroupmod.cg1unpacker(f, 'UN')
527 559
528 # End of ipeerlegacycommands interface. 560 # End of ipeerlegacycommands interface.
529 561
530 def _submitbatch(self, req): 562 def _submitbatch(self, req):