comparison mercurial/wireproto.py @ 33806:dedab036215d

wireproto: use new peer interface The wirepeer class provides concrete implementations of peer interface methods for calling wire protocol commands. It makes sense for this class to inherit from the peer abstract base class. So we change that. Since httppeer and sshpeer have already been converted to the new interface, peerrepository is no longer adding any value. So it has been removed. httppeer and sshpeer have been updated to reflect the loss of peerrepository and the inheritance of the abstract base class in wirepeer. The code changes in wirepeer are reordering of methods to group by interface. Some Python code in tests was updated to reflect changed APIs. .. api:: peer.peerrepository has been removed. Use repository.peer abstract base class to represent a peer repository. Differential Revision: https://phab.mercurial-scm.org/D338
author Gregory Szorc <gregory.szorc@gmail.com>
date Thu, 10 Aug 2017 20:58:28 -0700
parents b47fe9733d76
children 6c6169f71b8d
comparison
equal deleted inserted replaced
33805:f913e90f15a0 33806:dedab036215d
25 error, 25 error,
26 exchange, 26 exchange,
27 peer, 27 peer,
28 pushkey as pushkeymod, 28 pushkey as pushkeymod,
29 pycompat, 29 pycompat,
30 repository,
30 streamclone, 31 streamclone,
31 util, 32 util,
32 ) 33 )
33 34
34 urlerr = util.urlerr 35 urlerr = util.urlerr
210 'cg': 'boolean', 211 'cg': 'boolean',
211 'cbattempted': 'boolean'} 212 'cbattempted': 'boolean'}
212 213
213 # client side 214 # client side
214 215
215 class wirepeer(peer.peerrepository): 216 class wirepeer(repository.legacypeer):
216 """Client-side interface for communicating with a peer repository. 217 """Client-side interface for communicating with a peer repository.
217 218
218 Methods commonly call wire protocol commands of the same name. 219 Methods commonly call wire protocol commands of the same name.
219 220
220 See also httppeer.py and sshpeer.py for protocol-specific 221 See also httppeer.py and sshpeer.py for protocol-specific
221 implementations of this interface. 222 implementations of this interface.
222 """ 223 """
223 def _submitbatch(self, req): 224 # Begin of basewirepeer interface.
224 """run batch request <req> on the server
225
226 Returns an iterator of the raw responses from the server.
227 """
228 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
229 chunk = rsp.read(1024)
230 work = [chunk]
231 while chunk:
232 while ';' not in chunk and chunk:
233 chunk = rsp.read(1024)
234 work.append(chunk)
235 merged = ''.join(work)
236 while ';' in merged:
237 one, merged = merged.split(';', 1)
238 yield unescapearg(one)
239 chunk = rsp.read(1024)
240 work = [merged, chunk]
241 yield unescapearg(''.join(work))
242
243 def _submitone(self, op, args):
244 return self._call(op, **args)
245 225
246 def iterbatch(self): 226 def iterbatch(self):
247 return remoteiterbatcher(self) 227 return remoteiterbatcher(self)
248 228
249 @batchable 229 @batchable
291 branchmap[branchname] = branchheads 271 branchmap[branchname] = branchheads
292 yield branchmap 272 yield branchmap
293 except TypeError: 273 except TypeError:
294 self._abort(error.ResponseError(_("unexpected response:"), d)) 274 self._abort(error.ResponseError(_("unexpected response:"), d))
295 275
296 def branches(self, nodes): 276 @batchable
297 n = encodelist(nodes) 277 def listkeys(self, namespace):
298 d = self._call("branches", nodes=n) 278 if not self.capable('pushkey'):
299 try: 279 yield {}, None
300 br = [tuple(decodelist(b)) for b in d.splitlines()] 280 f = future()
301 return br 281 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
302 except ValueError: 282 yield {'namespace': encoding.fromlocal(namespace)}, f
303 self._abort(error.ResponseError(_("unexpected response:"), d)) 283 d = f.value
304 284 self.ui.debug('received listkey for "%s": %i bytes\n'
305 def between(self, pairs): 285 % (namespace, len(d)))
306 batch = 8 # avoid giant requests 286 yield pushkeymod.decodekeys(d)
307 r = []
308 for i in xrange(0, len(pairs), batch):
309 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
310 d = self._call("between", pairs=n)
311 try:
312 r.extend(l and decodelist(l) or [] for l in d.splitlines())
313 except ValueError:
314 self._abort(error.ResponseError(_("unexpected response:"), d))
315 return r
316 287
317 @batchable 288 @batchable
318 def pushkey(self, namespace, key, old, new): 289 def pushkey(self, namespace, key, old, new):
319 if not self.capable('pushkey'): 290 if not self.capable('pushkey'):
320 yield False, None 291 yield False, None
333 _('push failed (unexpected response):'), d) 304 _('push failed (unexpected response):'), d)
334 for l in output.splitlines(True): 305 for l in output.splitlines(True):
335 self.ui.status(_('remote: '), l) 306 self.ui.status(_('remote: '), l)
336 yield d 307 yield d
337 308
338 @batchable
339 def listkeys(self, namespace):
340 if not self.capable('pushkey'):
341 yield {}, None
342 f = future()
343 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
344 yield {'namespace': encoding.fromlocal(namespace)}, f
345 d = f.value
346 self.ui.debug('received listkey for "%s": %i bytes\n'
347 % (namespace, len(d)))
348 yield pushkeymod.decodekeys(d)
349
350 def stream_out(self): 309 def stream_out(self):
351 return self._callstream('stream_out') 310 return self._callstream('stream_out')
352
353 def changegroup(self, nodes, kind):
354 n = encodelist(nodes)
355 f = self._callcompressable("changegroup", roots=n)
356 return changegroupmod.cg1unpacker(f, 'UN')
357
358 def changegroupsubset(self, bases, heads, kind):
359 self.requirecap('changegroupsubset', _('look up remote changes'))
360 bases = encodelist(bases)
361 heads = encodelist(heads)
362 f = self._callcompressable("changegroupsubset",
363 bases=bases, heads=heads)
364 return changegroupmod.cg1unpacker(f, 'UN')
365 311
366 def getbundle(self, source, **kwargs): 312 def getbundle(self, source, **kwargs):
367 self.requirecap('getbundle', _('look up remote changes')) 313 self.requirecap('getbundle', _('look up remote changes'))
368 opts = {} 314 opts = {}
369 bundlecaps = kwargs.get('bundlecaps') 315 bundlecaps = kwargs.get('bundlecaps')
430 else: 376 else:
431 # bundle2 push. Send a stream, fetch a stream. 377 # bundle2 push. Send a stream, fetch a stream.
432 stream = self._calltwowaystream('unbundle', cg, heads=heads) 378 stream = self._calltwowaystream('unbundle', cg, heads=heads)
433 ret = bundle2.getunbundler(self.ui, stream) 379 ret = bundle2.getunbundler(self.ui, stream)
434 return ret 380 return ret
381
382 # End of basewirepeer interface.
383
384 # Begin of baselegacywirepeer interface.
385
386 def branches(self, nodes):
387 n = encodelist(nodes)
388 d = self._call("branches", nodes=n)
389 try:
390 br = [tuple(decodelist(b)) for b in d.splitlines()]
391 return br
392 except ValueError:
393 self._abort(error.ResponseError(_("unexpected response:"), d))
394
395 def between(self, pairs):
396 batch = 8 # avoid giant requests
397 r = []
398 for i in xrange(0, len(pairs), batch):
399 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
400 d = self._call("between", pairs=n)
401 try:
402 r.extend(l and decodelist(l) or [] for l in d.splitlines())
403 except ValueError:
404 self._abort(error.ResponseError(_("unexpected response:"), d))
405 return r
406
407 def changegroup(self, nodes, kind):
408 n = encodelist(nodes)
409 f = self._callcompressable("changegroup", roots=n)
410 return changegroupmod.cg1unpacker(f, 'UN')
411
412 def changegroupsubset(self, bases, heads, kind):
413 self.requirecap('changegroupsubset', _('look up remote changes'))
414 bases = encodelist(bases)
415 heads = encodelist(heads)
416 f = self._callcompressable("changegroupsubset",
417 bases=bases, heads=heads)
418 return changegroupmod.cg1unpacker(f, 'UN')
419
420 # End of baselegacywirepeer interface.
421
422 def _submitbatch(self, req):
423 """run batch request <req> on the server
424
425 Returns an iterator of the raw responses from the server.
426 """
427 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
428 chunk = rsp.read(1024)
429 work = [chunk]
430 while chunk:
431 while ';' not in chunk and chunk:
432 chunk = rsp.read(1024)
433 work.append(chunk)
434 merged = ''.join(work)
435 while ';' in merged:
436 one, merged = merged.split(';', 1)
437 yield unescapearg(one)
438 chunk = rsp.read(1024)
439 work = [merged, chunk]
440 yield unescapearg(''.join(work))
441
442 def _submitone(self, op, args):
443 return self._call(op, **args)
435 444
436 def debugwireargs(self, one, two, three=None, four=None, five=None): 445 def debugwireargs(self, one, two, three=None, four=None, five=None):
437 # don't pass optional arguments left at their default value 446 # don't pass optional arguments left at their default value
438 opts = {} 447 opts = {}
439 if three is not None: 448 if three is not None: