Mercurial > hg-stable
comparison mercurial/wireproto.py @ 37614:a81d02ea65db
wireproto: move version 1 peer functionality to standalone module (API)
wireproto.py contains code for both the client and the server. There
*should* be a somewhat strong separation between the two.
This commit extracts the client-side code from wireproto.py into a new
module - wireprotov1peer.
Differential Revision: https://phab.mercurial-scm.org/D3259
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 11 Apr 2018 12:49:08 -0700 |
parents | 96d735601ca1 |
children | 379d54eae6eb |
comparison
equal
deleted
inserted
replaced
37613:96d735601ca1 | 37614:a81d02ea65db |
---|---|
5 # This software may be used and distributed according to the terms of the | 5 # This software may be used and distributed according to the terms of the |
6 # GNU General Public License version 2 or any later version. | 6 # GNU General Public License version 2 or any later version. |
7 | 7 |
8 from __future__ import absolute_import | 8 from __future__ import absolute_import |
9 | 9 |
10 import hashlib | |
11 import os | 10 import os |
12 import tempfile | 11 import tempfile |
13 | 12 |
14 from .i18n import _ | 13 from .i18n import _ |
15 from .node import ( | 14 from .node import ( |
16 bin, | |
17 hex, | 15 hex, |
18 nullid, | 16 nullid, |
19 ) | 17 ) |
20 | 18 |
21 from . import ( | 19 from . import ( |
23 changegroup as changegroupmod, | 21 changegroup as changegroupmod, |
24 discovery, | 22 discovery, |
25 encoding, | 23 encoding, |
26 error, | 24 error, |
27 exchange, | 25 exchange, |
28 peer, | |
29 pushkey as pushkeymod, | 26 pushkey as pushkeymod, |
30 pycompat, | 27 pycompat, |
31 repository, | |
32 streamclone, | 28 streamclone, |
33 util, | 29 util, |
34 wireprototypes, | 30 wireprototypes, |
35 ) | 31 ) |
36 | 32 |
44 | 40 |
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required') | 41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required') |
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/' | 42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/' |
47 'IncompatibleClient') | 43 'IncompatibleClient') |
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint) | 44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint) |
49 | |
50 class remoteiterbatcher(peer.iterbatcher): | |
51 def __init__(self, remote): | |
52 super(remoteiterbatcher, self).__init__() | |
53 self._remote = remote | |
54 | |
55 def __getattr__(self, name): | |
56 # Validate this method is batchable, since submit() only supports | |
57 # batchable methods. | |
58 fn = getattr(self._remote, name) | |
59 if not getattr(fn, 'batchable', None): | |
60 raise error.ProgrammingError('Attempted to batch a non-batchable ' | |
61 'call to %r' % name) | |
62 | |
63 return super(remoteiterbatcher, self).__getattr__(name) | |
64 | |
65 def submit(self): | |
66 """Break the batch request into many patch calls and pipeline them. | |
67 | |
68 This is mostly valuable over http where request sizes can be | |
69 limited, but can be used in other places as well. | |
70 """ | |
71 # 2-tuple of (command, arguments) that represents what will be | |
72 # sent over the wire. | |
73 requests = [] | |
74 | |
75 # 4-tuple of (command, final future, @batchable generator, remote | |
76 # future). | |
77 results = [] | |
78 | |
79 for command, args, opts, finalfuture in self.calls: | |
80 mtd = getattr(self._remote, command) | |
81 batchable = mtd.batchable(mtd.__self__, *args, **opts) | |
82 | |
83 commandargs, fremote = next(batchable) | |
84 assert fremote | |
85 requests.append((command, commandargs)) | |
86 results.append((command, finalfuture, batchable, fremote)) | |
87 | |
88 if requests: | |
89 self._resultiter = self._remote._submitbatch(requests) | |
90 | |
91 self._results = results | |
92 | |
93 def results(self): | |
94 for command, finalfuture, batchable, remotefuture in self._results: | |
95 # Get the raw result, set it in the remote future, feed it | |
96 # back into the @batchable generator so it can be decoded, and | |
97 # set the result on the final future to this value. | |
98 remoteresult = next(self._resultiter) | |
99 remotefuture.set(remoteresult) | |
100 finalfuture.set(next(batchable)) | |
101 | |
102 # Verify our @batchable generators only emit 2 values. | |
103 try: | |
104 next(batchable) | |
105 except StopIteration: | |
106 pass | |
107 else: | |
108 raise error.ProgrammingError('%s @batchable generator emitted ' | |
109 'unexpected value count' % command) | |
110 | |
111 yield finalfuture.value | |
112 | |
113 # Forward a couple of names from peer to make wireproto interactions | |
114 # slightly more sensible. | |
115 batchable = peer.batchable | |
116 future = peer.future | |
117 | |
118 | |
119 def encodebatchcmds(req): | |
120 """Return a ``cmds`` argument value for the ``batch`` command.""" | |
121 escapearg = wireprototypes.escapebatcharg | |
122 | |
123 cmds = [] | |
124 for op, argsdict in req: | |
125 # Old servers didn't properly unescape argument names. So prevent | |
126 # the sending of argument names that may not be decoded properly by | |
127 # servers. | |
128 assert all(escapearg(k) == k for k in argsdict) | |
129 | |
130 args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) | |
131 for k, v in argsdict.iteritems()) | |
132 cmds.append('%s %s' % (op, args)) | |
133 | |
134 return ';'.join(cmds) | |
135 | 45 |
136 def clientcompressionsupport(proto): | 46 def clientcompressionsupport(proto): |
137 """Returns a list of compression methods supported by the client. | 47 """Returns a list of compression methods supported by the client. |
138 | 48 |
139 Returns a list of the compression methods supported by the client | 49 Returns a list of the compression methods supported by the client |
142 """ | 52 """ |
143 for cap in proto.getprotocaps(): | 53 for cap in proto.getprotocaps(): |
144 if cap.startswith('comp='): | 54 if cap.startswith('comp='): |
145 return cap[5:].split(',') | 55 return cap[5:].split(',') |
146 return ['zlib', 'none'] | 56 return ['zlib', 'none'] |
147 | |
148 # client side | |
149 | |
150 class wirepeer(repository.legacypeer): | |
151 """Client-side interface for communicating with a peer repository. | |
152 | |
153 Methods commonly call wire protocol commands of the same name. | |
154 | |
155 See also httppeer.py and sshpeer.py for protocol-specific | |
156 implementations of this interface. | |
157 """ | |
158 # Begin of ipeercommands interface. | |
159 | |
160 def iterbatch(self): | |
161 return remoteiterbatcher(self) | |
162 | |
163 @batchable | |
164 def lookup(self, key): | |
165 self.requirecap('lookup', _('look up remote revision')) | |
166 f = future() | |
167 yield {'key': encoding.fromlocal(key)}, f | |
168 d = f.value | |
169 success, data = d[:-1].split(" ", 1) | |
170 if int(success): | |
171 yield bin(data) | |
172 else: | |
173 self._abort(error.RepoError(data)) | |
174 | |
175 @batchable | |
176 def heads(self): | |
177 f = future() | |
178 yield {}, f | |
179 d = f.value | |
180 try: | |
181 yield wireprototypes.decodelist(d[:-1]) | |
182 except ValueError: | |
183 self._abort(error.ResponseError(_("unexpected response:"), d)) | |
184 | |
185 @batchable | |
186 def known(self, nodes): | |
187 f = future() | |
188 yield {'nodes': wireprototypes.encodelist(nodes)}, f | |
189 d = f.value | |
190 try: | |
191 yield [bool(int(b)) for b in d] | |
192 except ValueError: | |
193 self._abort(error.ResponseError(_("unexpected response:"), d)) | |
194 | |
195 @batchable | |
196 def branchmap(self): | |
197 f = future() | |
198 yield {}, f | |
199 d = f.value | |
200 try: | |
201 branchmap = {} | |
202 for branchpart in d.splitlines(): | |
203 branchname, branchheads = branchpart.split(' ', 1) | |
204 branchname = encoding.tolocal(urlreq.unquote(branchname)) | |
205 branchheads = wireprototypes.decodelist(branchheads) | |
206 branchmap[branchname] = branchheads | |
207 yield branchmap | |
208 except TypeError: | |
209 self._abort(error.ResponseError(_("unexpected response:"), d)) | |
210 | |
211 @batchable | |
212 def listkeys(self, namespace): | |
213 if not self.capable('pushkey'): | |
214 yield {}, None | |
215 f = future() | |
216 self.ui.debug('preparing listkeys for "%s"\n' % namespace) | |
217 yield {'namespace': encoding.fromlocal(namespace)}, f | |
218 d = f.value | |
219 self.ui.debug('received listkey for "%s": %i bytes\n' | |
220 % (namespace, len(d))) | |
221 yield pushkeymod.decodekeys(d) | |
222 | |
223 @batchable | |
224 def pushkey(self, namespace, key, old, new): | |
225 if not self.capable('pushkey'): | |
226 yield False, None | |
227 f = future() | |
228 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key)) | |
229 yield {'namespace': encoding.fromlocal(namespace), | |
230 'key': encoding.fromlocal(key), | |
231 'old': encoding.fromlocal(old), | |
232 'new': encoding.fromlocal(new)}, f | |
233 d = f.value | |
234 d, output = d.split('\n', 1) | |
235 try: | |
236 d = bool(int(d)) | |
237 except ValueError: | |
238 raise error.ResponseError( | |
239 _('push failed (unexpected response):'), d) | |
240 for l in output.splitlines(True): | |
241 self.ui.status(_('remote: '), l) | |
242 yield d | |
243 | |
244 def stream_out(self): | |
245 return self._callstream('stream_out') | |
246 | |
247 def getbundle(self, source, **kwargs): | |
248 kwargs = pycompat.byteskwargs(kwargs) | |
249 self.requirecap('getbundle', _('look up remote changes')) | |
250 opts = {} | |
251 bundlecaps = kwargs.get('bundlecaps') or set() | |
252 for key, value in kwargs.iteritems(): | |
253 if value is None: | |
254 continue | |
255 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) | |
256 if keytype is None: | |
257 raise error.ProgrammingError( | |
258 'Unexpectedly None keytype for key %s' % key) | |
259 elif keytype == 'nodes': | |
260 value = wireprototypes.encodelist(value) | |
261 elif keytype == 'csv': | |
262 value = ','.join(value) | |
263 elif keytype == 'scsv': | |
264 value = ','.join(sorted(value)) | |
265 elif keytype == 'boolean': | |
266 value = '%i' % bool(value) | |
267 elif keytype != 'plain': | |
268 raise KeyError('unknown getbundle option type %s' | |
269 % keytype) | |
270 opts[key] = value | |
271 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts)) | |
272 if any((cap.startswith('HG2') for cap in bundlecaps)): | |
273 return bundle2.getunbundler(self.ui, f) | |
274 else: | |
275 return changegroupmod.cg1unpacker(f, 'UN') | |
276 | |
277 def unbundle(self, cg, heads, url): | |
278 '''Send cg (a readable file-like object representing the | |
279 changegroup to push, typically a chunkbuffer object) to the | |
280 remote server as a bundle. | |
281 | |
282 When pushing a bundle10 stream, return an integer indicating the | |
283 result of the push (see changegroup.apply()). | |
284 | |
285 When pushing a bundle20 stream, return a bundle20 stream. | |
286 | |
287 `url` is the url the client thinks it's pushing to, which is | |
288 visible to hooks. | |
289 ''' | |
290 | |
291 if heads != ['force'] and self.capable('unbundlehash'): | |
292 heads = wireprototypes.encodelist( | |
293 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) | |
294 else: | |
295 heads = wireprototypes.encodelist(heads) | |
296 | |
297 if util.safehasattr(cg, 'deltaheader'): | |
298 # this a bundle10, do the old style call sequence | |
299 ret, output = self._callpush("unbundle", cg, heads=heads) | |
300 if ret == "": | |
301 raise error.ResponseError( | |
302 _('push failed:'), output) | |
303 try: | |
304 ret = int(ret) | |
305 except ValueError: | |
306 raise error.ResponseError( | |
307 _('push failed (unexpected response):'), ret) | |
308 | |
309 for l in output.splitlines(True): | |
310 self.ui.status(_('remote: '), l) | |
311 else: | |
312 # bundle2 push. Send a stream, fetch a stream. | |
313 stream = self._calltwowaystream('unbundle', cg, heads=heads) | |
314 ret = bundle2.getunbundler(self.ui, stream) | |
315 return ret | |
316 | |
317 # End of ipeercommands interface. | |
318 | |
319 # Begin of ipeerlegacycommands interface. | |
320 | |
321 def branches(self, nodes): | |
322 n = wireprototypes.encodelist(nodes) | |
323 d = self._call("branches", nodes=n) | |
324 try: | |
325 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] | |
326 return br | |
327 except ValueError: | |
328 self._abort(error.ResponseError(_("unexpected response:"), d)) | |
329 | |
330 def between(self, pairs): | |
331 batch = 8 # avoid giant requests | |
332 r = [] | |
333 for i in xrange(0, len(pairs), batch): | |
334 n = " ".join([wireprototypes.encodelist(p, '-') | |
335 for p in pairs[i:i + batch]]) | |
336 d = self._call("between", pairs=n) | |
337 try: | |
338 r.extend(l and wireprototypes.decodelist(l) or [] | |
339 for l in d.splitlines()) | |
340 except ValueError: | |
341 self._abort(error.ResponseError(_("unexpected response:"), d)) | |
342 return r | |
343 | |
344 def changegroup(self, nodes, kind): | |
345 n = wireprototypes.encodelist(nodes) | |
346 f = self._callcompressable("changegroup", roots=n) | |
347 return changegroupmod.cg1unpacker(f, 'UN') | |
348 | |
349 def changegroupsubset(self, bases, heads, kind): | |
350 self.requirecap('changegroupsubset', _('look up remote changes')) | |
351 bases = wireprototypes.encodelist(bases) | |
352 heads = wireprototypes.encodelist(heads) | |
353 f = self._callcompressable("changegroupsubset", | |
354 bases=bases, heads=heads) | |
355 return changegroupmod.cg1unpacker(f, 'UN') | |
356 | |
357 # End of ipeerlegacycommands interface. | |
358 | |
359 def _submitbatch(self, req): | |
360 """run batch request <req> on the server | |
361 | |
362 Returns an iterator of the raw responses from the server. | |
363 """ | |
364 ui = self.ui | |
365 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): | |
366 ui.debug('devel-peer-request: batched-content\n') | |
367 for op, args in req: | |
368 msg = 'devel-peer-request: - %s (%d arguments)\n' | |
369 ui.debug(msg % (op, len(args))) | |
370 | |
371 unescapearg = wireprototypes.unescapebatcharg | |
372 | |
373 rsp = self._callstream("batch", cmds=encodebatchcmds(req)) | |
374 chunk = rsp.read(1024) | |
375 work = [chunk] | |
376 while chunk: | |
377 while ';' not in chunk and chunk: | |
378 chunk = rsp.read(1024) | |
379 work.append(chunk) | |
380 merged = ''.join(work) | |
381 while ';' in merged: | |
382 one, merged = merged.split(';', 1) | |
383 yield unescapearg(one) | |
384 chunk = rsp.read(1024) | |
385 work = [merged, chunk] | |
386 yield unescapearg(''.join(work)) | |
387 | |
388 def _submitone(self, op, args): | |
389 return self._call(op, **pycompat.strkwargs(args)) | |
390 | |
391 def debugwireargs(self, one, two, three=None, four=None, five=None): | |
392 # don't pass optional arguments left at their default value | |
393 opts = {} | |
394 if three is not None: | |
395 opts[r'three'] = three | |
396 if four is not None: | |
397 opts[r'four'] = four | |
398 return self._call('debugwireargs', one=one, two=two, **opts) | |
399 | |
400 def _call(self, cmd, **args): | |
401 """execute <cmd> on the server | |
402 | |
403 The command is expected to return a simple string. | |
404 | |
405 returns the server reply as a string.""" | |
406 raise NotImplementedError() | |
407 | |
408 def _callstream(self, cmd, **args): | |
409 """execute <cmd> on the server | |
410 | |
411 The command is expected to return a stream. Note that if the | |
412 command doesn't return a stream, _callstream behaves | |
413 differently for ssh and http peers. | |
414 | |
415 returns the server reply as a file like object. | |
416 """ | |
417 raise NotImplementedError() | |
418 | |
419 def _callcompressable(self, cmd, **args): | |
420 """execute <cmd> on the server | |
421 | |
422 The command is expected to return a stream. | |
423 | |
424 The stream may have been compressed in some implementations. This | |
425 function takes care of the decompression. This is the only difference | |
426 with _callstream. | |
427 | |
428 returns the server reply as a file like object. | |
429 """ | |
430 raise NotImplementedError() | |
431 | |
432 def _callpush(self, cmd, fp, **args): | |
433 """execute a <cmd> on server | |
434 | |
435 The command is expected to be related to a push. Push has a special | |
436 return method. | |
437 | |
438 returns the server reply as a (ret, output) tuple. ret is either | |
439 empty (error) or a stringified int. | |
440 """ | |
441 raise NotImplementedError() | |
442 | |
443 def _calltwowaystream(self, cmd, fp, **args): | |
444 """execute <cmd> on server | |
445 | |
446 The command will send a stream to the server and get a stream in reply. | |
447 """ | |
448 raise NotImplementedError() | |
449 | |
450 def _abort(self, exception): | |
451 """clearly abort the wire protocol connection and raise the exception | |
452 """ | |
453 raise NotImplementedError() | |
454 | |
455 # server side | |
456 | 57 |
457 # wire protocol command can either return a string or one of these classes. | 58 # wire protocol command can either return a string or one of these classes. |
458 | 59 |
459 def getdispatchrepo(repo, proto, command): | 60 def getdispatchrepo(repo, proto, command): |
460 """Obtain the repo used for processing wire protocol commands. | 61 """Obtain the repo used for processing wire protocol commands. |