comparison mercurial/wireproto.py @ 37614:a81d02ea65db

wireproto: move version 1 peer functionality to standalone module (API) wireproto.py contains code for both the client and the server. There *should* be a somewhat strong separation between the two. This commit extracts the client-side code from wireproto.py into a new module - wireprotov1peer. Differential Revision: https://phab.mercurial-scm.org/D3259
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 11 Apr 2018 12:49:08 -0700
parents 96d735601ca1
children 379d54eae6eb
comparison
equal deleted inserted replaced
37613:96d735601ca1 37614:a81d02ea65db
5 # This software may be used and distributed according to the terms of the 5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version. 6 # GNU General Public License version 2 or any later version.
7 7
8 from __future__ import absolute_import 8 from __future__ import absolute_import
9 9
10 import hashlib
11 import os 10 import os
12 import tempfile 11 import tempfile
13 12
14 from .i18n import _ 13 from .i18n import _
15 from .node import ( 14 from .node import (
16 bin,
17 hex, 15 hex,
18 nullid, 16 nullid,
19 ) 17 )
20 18
21 from . import ( 19 from . import (
23 changegroup as changegroupmod, 21 changegroup as changegroupmod,
24 discovery, 22 discovery,
25 encoding, 23 encoding,
26 error, 24 error,
27 exchange, 25 exchange,
28 peer,
29 pushkey as pushkeymod, 26 pushkey as pushkeymod,
30 pycompat, 27 pycompat,
31 repository,
32 streamclone, 28 streamclone,
33 util, 29 util,
34 wireprototypes, 30 wireprototypes,
35 ) 31 )
36 32
44 40
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required') 41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/' 42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 'IncompatibleClient') 43 'IncompatibleClient')
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint) 44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49
50 class remoteiterbatcher(peer.iterbatcher):
51 def __init__(self, remote):
52 super(remoteiterbatcher, self).__init__()
53 self._remote = remote
54
55 def __getattr__(self, name):
56 # Validate this method is batchable, since submit() only supports
57 # batchable methods.
58 fn = getattr(self._remote, name)
59 if not getattr(fn, 'batchable', None):
60 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 'call to %r' % name)
62
63 return super(remoteiterbatcher, self).__getattr__(name)
64
65 def submit(self):
66 """Break the batch request into many patch calls and pipeline them.
67
68 This is mostly valuable over http where request sizes can be
69 limited, but can be used in other places as well.
70 """
71 # 2-tuple of (command, arguments) that represents what will be
72 # sent over the wire.
73 requests = []
74
75 # 4-tuple of (command, final future, @batchable generator, remote
76 # future).
77 results = []
78
79 for command, args, opts, finalfuture in self.calls:
80 mtd = getattr(self._remote, command)
81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82
83 commandargs, fremote = next(batchable)
84 assert fremote
85 requests.append((command, commandargs))
86 results.append((command, finalfuture, batchable, fremote))
87
88 if requests:
89 self._resultiter = self._remote._submitbatch(requests)
90
91 self._results = results
92
93 def results(self):
94 for command, finalfuture, batchable, remotefuture in self._results:
95 # Get the raw result, set it in the remote future, feed it
96 # back into the @batchable generator so it can be decoded, and
97 # set the result on the final future to this value.
98 remoteresult = next(self._resultiter)
99 remotefuture.set(remoteresult)
100 finalfuture.set(next(batchable))
101
102 # Verify our @batchable generators only emit 2 values.
103 try:
104 next(batchable)
105 except StopIteration:
106 pass
107 else:
108 raise error.ProgrammingError('%s @batchable generator emitted '
109 'unexpected value count' % command)
110
111 yield finalfuture.value
112
113 # Forward a couple of names from peer to make wireproto interactions
114 # slightly more sensible.
115 batchable = peer.batchable
116 future = peer.future
117
118
119 def encodebatchcmds(req):
120 """Return a ``cmds`` argument value for the ``batch`` command."""
121 escapearg = wireprototypes.escapebatcharg
122
123 cmds = []
124 for op, argsdict in req:
125 # Old servers didn't properly unescape argument names. So prevent
126 # the sending of argument names that may not be decoded properly by
127 # servers.
128 assert all(escapearg(k) == k for k in argsdict)
129
130 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
131 for k, v in argsdict.iteritems())
132 cmds.append('%s %s' % (op, args))
133
134 return ';'.join(cmds)
135 45
136 def clientcompressionsupport(proto): 46 def clientcompressionsupport(proto):
137 """Returns a list of compression methods supported by the client. 47 """Returns a list of compression methods supported by the client.
138 48
139 Returns a list of the compression methods supported by the client 49 Returns a list of the compression methods supported by the client
142 """ 52 """
143 for cap in proto.getprotocaps(): 53 for cap in proto.getprotocaps():
144 if cap.startswith('comp='): 54 if cap.startswith('comp='):
145 return cap[5:].split(',') 55 return cap[5:].split(',')
146 return ['zlib', 'none'] 56 return ['zlib', 'none']
147
148 # client side
149
150 class wirepeer(repository.legacypeer):
151 """Client-side interface for communicating with a peer repository.
152
153 Methods commonly call wire protocol commands of the same name.
154
155 See also httppeer.py and sshpeer.py for protocol-specific
156 implementations of this interface.
157 """
158 # Begin of ipeercommands interface.
159
160 def iterbatch(self):
161 return remoteiterbatcher(self)
162
163 @batchable
164 def lookup(self, key):
165 self.requirecap('lookup', _('look up remote revision'))
166 f = future()
167 yield {'key': encoding.fromlocal(key)}, f
168 d = f.value
169 success, data = d[:-1].split(" ", 1)
170 if int(success):
171 yield bin(data)
172 else:
173 self._abort(error.RepoError(data))
174
175 @batchable
176 def heads(self):
177 f = future()
178 yield {}, f
179 d = f.value
180 try:
181 yield wireprototypes.decodelist(d[:-1])
182 except ValueError:
183 self._abort(error.ResponseError(_("unexpected response:"), d))
184
185 @batchable
186 def known(self, nodes):
187 f = future()
188 yield {'nodes': wireprototypes.encodelist(nodes)}, f
189 d = f.value
190 try:
191 yield [bool(int(b)) for b in d]
192 except ValueError:
193 self._abort(error.ResponseError(_("unexpected response:"), d))
194
195 @batchable
196 def branchmap(self):
197 f = future()
198 yield {}, f
199 d = f.value
200 try:
201 branchmap = {}
202 for branchpart in d.splitlines():
203 branchname, branchheads = branchpart.split(' ', 1)
204 branchname = encoding.tolocal(urlreq.unquote(branchname))
205 branchheads = wireprototypes.decodelist(branchheads)
206 branchmap[branchname] = branchheads
207 yield branchmap
208 except TypeError:
209 self._abort(error.ResponseError(_("unexpected response:"), d))
210
211 @batchable
212 def listkeys(self, namespace):
213 if not self.capable('pushkey'):
214 yield {}, None
215 f = future()
216 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
217 yield {'namespace': encoding.fromlocal(namespace)}, f
218 d = f.value
219 self.ui.debug('received listkey for "%s": %i bytes\n'
220 % (namespace, len(d)))
221 yield pushkeymod.decodekeys(d)
222
223 @batchable
224 def pushkey(self, namespace, key, old, new):
225 if not self.capable('pushkey'):
226 yield False, None
227 f = future()
228 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
229 yield {'namespace': encoding.fromlocal(namespace),
230 'key': encoding.fromlocal(key),
231 'old': encoding.fromlocal(old),
232 'new': encoding.fromlocal(new)}, f
233 d = f.value
234 d, output = d.split('\n', 1)
235 try:
236 d = bool(int(d))
237 except ValueError:
238 raise error.ResponseError(
239 _('push failed (unexpected response):'), d)
240 for l in output.splitlines(True):
241 self.ui.status(_('remote: '), l)
242 yield d
243
244 def stream_out(self):
245 return self._callstream('stream_out')
246
247 def getbundle(self, source, **kwargs):
248 kwargs = pycompat.byteskwargs(kwargs)
249 self.requirecap('getbundle', _('look up remote changes'))
250 opts = {}
251 bundlecaps = kwargs.get('bundlecaps') or set()
252 for key, value in kwargs.iteritems():
253 if value is None:
254 continue
255 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
256 if keytype is None:
257 raise error.ProgrammingError(
258 'Unexpectedly None keytype for key %s' % key)
259 elif keytype == 'nodes':
260 value = wireprototypes.encodelist(value)
261 elif keytype == 'csv':
262 value = ','.join(value)
263 elif keytype == 'scsv':
264 value = ','.join(sorted(value))
265 elif keytype == 'boolean':
266 value = '%i' % bool(value)
267 elif keytype != 'plain':
268 raise KeyError('unknown getbundle option type %s'
269 % keytype)
270 opts[key] = value
271 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
272 if any((cap.startswith('HG2') for cap in bundlecaps)):
273 return bundle2.getunbundler(self.ui, f)
274 else:
275 return changegroupmod.cg1unpacker(f, 'UN')
276
277 def unbundle(self, cg, heads, url):
278 '''Send cg (a readable file-like object representing the
279 changegroup to push, typically a chunkbuffer object) to the
280 remote server as a bundle.
281
282 When pushing a bundle10 stream, return an integer indicating the
283 result of the push (see changegroup.apply()).
284
285 When pushing a bundle20 stream, return a bundle20 stream.
286
287 `url` is the url the client thinks it's pushing to, which is
288 visible to hooks.
289 '''
290
291 if heads != ['force'] and self.capable('unbundlehash'):
292 heads = wireprototypes.encodelist(
293 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
294 else:
295 heads = wireprototypes.encodelist(heads)
296
297 if util.safehasattr(cg, 'deltaheader'):
298 # this a bundle10, do the old style call sequence
299 ret, output = self._callpush("unbundle", cg, heads=heads)
300 if ret == "":
301 raise error.ResponseError(
302 _('push failed:'), output)
303 try:
304 ret = int(ret)
305 except ValueError:
306 raise error.ResponseError(
307 _('push failed (unexpected response):'), ret)
308
309 for l in output.splitlines(True):
310 self.ui.status(_('remote: '), l)
311 else:
312 # bundle2 push. Send a stream, fetch a stream.
313 stream = self._calltwowaystream('unbundle', cg, heads=heads)
314 ret = bundle2.getunbundler(self.ui, stream)
315 return ret
316
317 # End of ipeercommands interface.
318
319 # Begin of ipeerlegacycommands interface.
320
321 def branches(self, nodes):
322 n = wireprototypes.encodelist(nodes)
323 d = self._call("branches", nodes=n)
324 try:
325 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
326 return br
327 except ValueError:
328 self._abort(error.ResponseError(_("unexpected response:"), d))
329
330 def between(self, pairs):
331 batch = 8 # avoid giant requests
332 r = []
333 for i in xrange(0, len(pairs), batch):
334 n = " ".join([wireprototypes.encodelist(p, '-')
335 for p in pairs[i:i + batch]])
336 d = self._call("between", pairs=n)
337 try:
338 r.extend(l and wireprototypes.decodelist(l) or []
339 for l in d.splitlines())
340 except ValueError:
341 self._abort(error.ResponseError(_("unexpected response:"), d))
342 return r
343
344 def changegroup(self, nodes, kind):
345 n = wireprototypes.encodelist(nodes)
346 f = self._callcompressable("changegroup", roots=n)
347 return changegroupmod.cg1unpacker(f, 'UN')
348
349 def changegroupsubset(self, bases, heads, kind):
350 self.requirecap('changegroupsubset', _('look up remote changes'))
351 bases = wireprototypes.encodelist(bases)
352 heads = wireprototypes.encodelist(heads)
353 f = self._callcompressable("changegroupsubset",
354 bases=bases, heads=heads)
355 return changegroupmod.cg1unpacker(f, 'UN')
356
357 # End of ipeerlegacycommands interface.
358
359 def _submitbatch(self, req):
360 """run batch request <req> on the server
361
362 Returns an iterator of the raw responses from the server.
363 """
364 ui = self.ui
365 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
366 ui.debug('devel-peer-request: batched-content\n')
367 for op, args in req:
368 msg = 'devel-peer-request: - %s (%d arguments)\n'
369 ui.debug(msg % (op, len(args)))
370
371 unescapearg = wireprototypes.unescapebatcharg
372
373 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
374 chunk = rsp.read(1024)
375 work = [chunk]
376 while chunk:
377 while ';' not in chunk and chunk:
378 chunk = rsp.read(1024)
379 work.append(chunk)
380 merged = ''.join(work)
381 while ';' in merged:
382 one, merged = merged.split(';', 1)
383 yield unescapearg(one)
384 chunk = rsp.read(1024)
385 work = [merged, chunk]
386 yield unescapearg(''.join(work))
387
388 def _submitone(self, op, args):
389 return self._call(op, **pycompat.strkwargs(args))
390
391 def debugwireargs(self, one, two, three=None, four=None, five=None):
392 # don't pass optional arguments left at their default value
393 opts = {}
394 if three is not None:
395 opts[r'three'] = three
396 if four is not None:
397 opts[r'four'] = four
398 return self._call('debugwireargs', one=one, two=two, **opts)
399
400 def _call(self, cmd, **args):
401 """execute <cmd> on the server
402
403 The command is expected to return a simple string.
404
405 returns the server reply as a string."""
406 raise NotImplementedError()
407
408 def _callstream(self, cmd, **args):
409 """execute <cmd> on the server
410
411 The command is expected to return a stream. Note that if the
412 command doesn't return a stream, _callstream behaves
413 differently for ssh and http peers.
414
415 returns the server reply as a file like object.
416 """
417 raise NotImplementedError()
418
419 def _callcompressable(self, cmd, **args):
420 """execute <cmd> on the server
421
422 The command is expected to return a stream.
423
424 The stream may have been compressed in some implementations. This
425 function takes care of the decompression. This is the only difference
426 with _callstream.
427
428 returns the server reply as a file like object.
429 """
430 raise NotImplementedError()
431
432 def _callpush(self, cmd, fp, **args):
433 """execute a <cmd> on server
434
435 The command is expected to be related to a push. Push has a special
436 return method.
437
438 returns the server reply as a (ret, output) tuple. ret is either
439 empty (error) or a stringified int.
440 """
441 raise NotImplementedError()
442
443 def _calltwowaystream(self, cmd, fp, **args):
444 """execute <cmd> on server
445
446 The command will send a stream to the server and get a stream in reply.
447 """
448 raise NotImplementedError()
449
450 def _abort(self, exception):
451 """clearly abort the wire protocol connection and raise the exception
452 """
453 raise NotImplementedError()
454
455 # server side
456 57
457 # wire protocol command can either return a string or one of these classes. 58 # wire protocol command can either return a string or one of these classes.
458 59
459 def getdispatchrepo(repo, proto, command): 60 def getdispatchrepo(repo, proto, command):
460 """Obtain the repo used for processing wire protocol commands. 61 """Obtain the repo used for processing wire protocol commands.