Mercurial > hg
comparison mercurial/wireprotov1peer.py @ 37614:a81d02ea65db
wireproto: move version 1 peer functionality to standalone module (API)
wireproto.py contains code for both the client and the server. There
*should* be a somewhat strong separation between the two.
This commit extracts the client-side code from wireproto.py into a new
module - wireprotov1peer.
Differential Revision: https://phab.mercurial-scm.org/D3259
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 11 Apr 2018 12:49:08 -0700 |
parents | mercurial/wireproto.py@96d735601ca1 |
children | f3dc8239e3a9 |
comparison
equal
deleted
inserted
replaced
37613:96d735601ca1 | 37614:a81d02ea65db |
---|---|
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1. | |
2 # | |
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com> | |
4 # | |
5 # This software may be used and distributed according to the terms of the | |
6 # GNU General Public License version 2 or any later version. | |
7 | |
8 from __future__ import absolute_import | |
9 | |
10 import hashlib | |
11 | |
12 from .i18n import _ | |
13 from .node import ( | |
14 bin, | |
15 ) | |
16 | |
17 from . import ( | |
18 bundle2, | |
19 changegroup as changegroupmod, | |
20 encoding, | |
21 error, | |
22 peer, | |
23 pushkey as pushkeymod, | |
24 pycompat, | |
25 repository, | |
26 util, | |
27 wireprototypes, | |
28 ) | |
29 | |
30 urlreq = util.urlreq | |
31 | |
32 class remoteiterbatcher(peer.iterbatcher): | |
33 def __init__(self, remote): | |
34 super(remoteiterbatcher, self).__init__() | |
35 self._remote = remote | |
36 | |
37 def __getattr__(self, name): | |
38 # Validate this method is batchable, since submit() only supports | |
39 # batchable methods. | |
40 fn = getattr(self._remote, name) | |
41 if not getattr(fn, 'batchable', None): | |
42 raise error.ProgrammingError('Attempted to batch a non-batchable ' | |
43 'call to %r' % name) | |
44 | |
45 return super(remoteiterbatcher, self).__getattr__(name) | |
46 | |
47 def submit(self): | |
48 """Break the batch request into many patch calls and pipeline them. | |
49 | |
50 This is mostly valuable over http where request sizes can be | |
51 limited, but can be used in other places as well. | |
52 """ | |
53 # 2-tuple of (command, arguments) that represents what will be | |
54 # sent over the wire. | |
55 requests = [] | |
56 | |
57 # 4-tuple of (command, final future, @batchable generator, remote | |
58 # future). | |
59 results = [] | |
60 | |
61 for command, args, opts, finalfuture in self.calls: | |
62 mtd = getattr(self._remote, command) | |
63 batchable = mtd.batchable(mtd.__self__, *args, **opts) | |
64 | |
65 commandargs, fremote = next(batchable) | |
66 assert fremote | |
67 requests.append((command, commandargs)) | |
68 results.append((command, finalfuture, batchable, fremote)) | |
69 | |
70 if requests: | |
71 self._resultiter = self._remote._submitbatch(requests) | |
72 | |
73 self._results = results | |
74 | |
75 def results(self): | |
76 for command, finalfuture, batchable, remotefuture in self._results: | |
77 # Get the raw result, set it in the remote future, feed it | |
78 # back into the @batchable generator so it can be decoded, and | |
79 # set the result on the final future to this value. | |
80 remoteresult = next(self._resultiter) | |
81 remotefuture.set(remoteresult) | |
82 finalfuture.set(next(batchable)) | |
83 | |
84 # Verify our @batchable generators only emit 2 values. | |
85 try: | |
86 next(batchable) | |
87 except StopIteration: | |
88 pass | |
89 else: | |
90 raise error.ProgrammingError('%s @batchable generator emitted ' | |
91 'unexpected value count' % command) | |
92 | |
93 yield finalfuture.value | |
94 | |
95 # Forward a couple of names from peer to make wireproto interactions | |
96 # slightly more sensible. | |
97 batchable = peer.batchable | |
98 future = peer.future | |
99 | |
100 def encodebatchcmds(req): | |
101 """Return a ``cmds`` argument value for the ``batch`` command.""" | |
102 escapearg = wireprototypes.escapebatcharg | |
103 | |
104 cmds = [] | |
105 for op, argsdict in req: | |
106 # Old servers didn't properly unescape argument names. So prevent | |
107 # the sending of argument names that may not be decoded properly by | |
108 # servers. | |
109 assert all(escapearg(k) == k for k in argsdict) | |
110 | |
111 args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) | |
112 for k, v in argsdict.iteritems()) | |
113 cmds.append('%s %s' % (op, args)) | |
114 | |
115 return ';'.join(cmds) | |
116 | |
117 class wirepeer(repository.legacypeer): | |
118 """Client-side interface for communicating with a peer repository. | |
119 | |
120 Methods commonly call wire protocol commands of the same name. | |
121 | |
122 See also httppeer.py and sshpeer.py for protocol-specific | |
123 implementations of this interface. | |
124 """ | |
125 # Begin of ipeercommands interface. | |
126 | |
127 def iterbatch(self): | |
128 return remoteiterbatcher(self) | |
129 | |
130 @batchable | |
131 def lookup(self, key): | |
132 self.requirecap('lookup', _('look up remote revision')) | |
133 f = future() | |
134 yield {'key': encoding.fromlocal(key)}, f | |
135 d = f.value | |
136 success, data = d[:-1].split(" ", 1) | |
137 if int(success): | |
138 yield bin(data) | |
139 else: | |
140 self._abort(error.RepoError(data)) | |
141 | |
142 @batchable | |
143 def heads(self): | |
144 f = future() | |
145 yield {}, f | |
146 d = f.value | |
147 try: | |
148 yield wireprototypes.decodelist(d[:-1]) | |
149 except ValueError: | |
150 self._abort(error.ResponseError(_("unexpected response:"), d)) | |
151 | |
152 @batchable | |
153 def known(self, nodes): | |
154 f = future() | |
155 yield {'nodes': wireprototypes.encodelist(nodes)}, f | |
156 d = f.value | |
157 try: | |
158 yield [bool(int(b)) for b in d] | |
159 except ValueError: | |
160 self._abort(error.ResponseError(_("unexpected response:"), d)) | |
161 | |
162 @batchable | |
163 def branchmap(self): | |
164 f = future() | |
165 yield {}, f | |
166 d = f.value | |
167 try: | |
168 branchmap = {} | |
169 for branchpart in d.splitlines(): | |
170 branchname, branchheads = branchpart.split(' ', 1) | |
171 branchname = encoding.tolocal(urlreq.unquote(branchname)) | |
172 branchheads = wireprototypes.decodelist(branchheads) | |
173 branchmap[branchname] = branchheads | |
174 yield branchmap | |
175 except TypeError: | |
176 self._abort(error.ResponseError(_("unexpected response:"), d)) | |
177 | |
178 @batchable | |
179 def listkeys(self, namespace): | |
180 if not self.capable('pushkey'): | |
181 yield {}, None | |
182 f = future() | |
183 self.ui.debug('preparing listkeys for "%s"\n' % namespace) | |
184 yield {'namespace': encoding.fromlocal(namespace)}, f | |
185 d = f.value | |
186 self.ui.debug('received listkey for "%s": %i bytes\n' | |
187 % (namespace, len(d))) | |
188 yield pushkeymod.decodekeys(d) | |
189 | |
190 @batchable | |
191 def pushkey(self, namespace, key, old, new): | |
192 if not self.capable('pushkey'): | |
193 yield False, None | |
194 f = future() | |
195 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key)) | |
196 yield {'namespace': encoding.fromlocal(namespace), | |
197 'key': encoding.fromlocal(key), | |
198 'old': encoding.fromlocal(old), | |
199 'new': encoding.fromlocal(new)}, f | |
200 d = f.value | |
201 d, output = d.split('\n', 1) | |
202 try: | |
203 d = bool(int(d)) | |
204 except ValueError: | |
205 raise error.ResponseError( | |
206 _('push failed (unexpected response):'), d) | |
207 for l in output.splitlines(True): | |
208 self.ui.status(_('remote: '), l) | |
209 yield d | |
210 | |
211 def stream_out(self): | |
212 return self._callstream('stream_out') | |
213 | |
214 def getbundle(self, source, **kwargs): | |
215 kwargs = pycompat.byteskwargs(kwargs) | |
216 self.requirecap('getbundle', _('look up remote changes')) | |
217 opts = {} | |
218 bundlecaps = kwargs.get('bundlecaps') or set() | |
219 for key, value in kwargs.iteritems(): | |
220 if value is None: | |
221 continue | |
222 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) | |
223 if keytype is None: | |
224 raise error.ProgrammingError( | |
225 'Unexpectedly None keytype for key %s' % key) | |
226 elif keytype == 'nodes': | |
227 value = wireprototypes.encodelist(value) | |
228 elif keytype == 'csv': | |
229 value = ','.join(value) | |
230 elif keytype == 'scsv': | |
231 value = ','.join(sorted(value)) | |
232 elif keytype == 'boolean': | |
233 value = '%i' % bool(value) | |
234 elif keytype != 'plain': | |
235 raise KeyError('unknown getbundle option type %s' | |
236 % keytype) | |
237 opts[key] = value | |
238 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts)) | |
239 if any((cap.startswith('HG2') for cap in bundlecaps)): | |
240 return bundle2.getunbundler(self.ui, f) | |
241 else: | |
242 return changegroupmod.cg1unpacker(f, 'UN') | |
243 | |
244 def unbundle(self, cg, heads, url): | |
245 '''Send cg (a readable file-like object representing the | |
246 changegroup to push, typically a chunkbuffer object) to the | |
247 remote server as a bundle. | |
248 | |
249 When pushing a bundle10 stream, return an integer indicating the | |
250 result of the push (see changegroup.apply()). | |
251 | |
252 When pushing a bundle20 stream, return a bundle20 stream. | |
253 | |
254 `url` is the url the client thinks it's pushing to, which is | |
255 visible to hooks. | |
256 ''' | |
257 | |
258 if heads != ['force'] and self.capable('unbundlehash'): | |
259 heads = wireprototypes.encodelist( | |
260 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) | |
261 else: | |
262 heads = wireprototypes.encodelist(heads) | |
263 | |
264 if util.safehasattr(cg, 'deltaheader'): | |
265 # this a bundle10, do the old style call sequence | |
266 ret, output = self._callpush("unbundle", cg, heads=heads) | |
267 if ret == "": | |
268 raise error.ResponseError( | |
269 _('push failed:'), output) | |
270 try: | |
271 ret = int(ret) | |
272 except ValueError: | |
273 raise error.ResponseError( | |
274 _('push failed (unexpected response):'), ret) | |
275 | |
276 for l in output.splitlines(True): | |
277 self.ui.status(_('remote: '), l) | |
278 else: | |
279 # bundle2 push. Send a stream, fetch a stream. | |
280 stream = self._calltwowaystream('unbundle', cg, heads=heads) | |
281 ret = bundle2.getunbundler(self.ui, stream) | |
282 return ret | |
283 | |
284 # End of ipeercommands interface. | |
285 | |
286 # Begin of ipeerlegacycommands interface. | |
287 | |
288 def branches(self, nodes): | |
289 n = wireprototypes.encodelist(nodes) | |
290 d = self._call("branches", nodes=n) | |
291 try: | |
292 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] | |
293 return br | |
294 except ValueError: | |
295 self._abort(error.ResponseError(_("unexpected response:"), d)) | |
296 | |
297 def between(self, pairs): | |
298 batch = 8 # avoid giant requests | |
299 r = [] | |
300 for i in xrange(0, len(pairs), batch): | |
301 n = " ".join([wireprototypes.encodelist(p, '-') | |
302 for p in pairs[i:i + batch]]) | |
303 d = self._call("between", pairs=n) | |
304 try: | |
305 r.extend(l and wireprototypes.decodelist(l) or [] | |
306 for l in d.splitlines()) | |
307 except ValueError: | |
308 self._abort(error.ResponseError(_("unexpected response:"), d)) | |
309 return r | |
310 | |
311 def changegroup(self, nodes, kind): | |
312 n = wireprototypes.encodelist(nodes) | |
313 f = self._callcompressable("changegroup", roots=n) | |
314 return changegroupmod.cg1unpacker(f, 'UN') | |
315 | |
316 def changegroupsubset(self, bases, heads, kind): | |
317 self.requirecap('changegroupsubset', _('look up remote changes')) | |
318 bases = wireprototypes.encodelist(bases) | |
319 heads = wireprototypes.encodelist(heads) | |
320 f = self._callcompressable("changegroupsubset", | |
321 bases=bases, heads=heads) | |
322 return changegroupmod.cg1unpacker(f, 'UN') | |
323 | |
324 # End of ipeerlegacycommands interface. | |
325 | |
326 def _submitbatch(self, req): | |
327 """run batch request <req> on the server | |
328 | |
329 Returns an iterator of the raw responses from the server. | |
330 """ | |
331 ui = self.ui | |
332 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): | |
333 ui.debug('devel-peer-request: batched-content\n') | |
334 for op, args in req: | |
335 msg = 'devel-peer-request: - %s (%d arguments)\n' | |
336 ui.debug(msg % (op, len(args))) | |
337 | |
338 unescapearg = wireprototypes.unescapebatcharg | |
339 | |
340 rsp = self._callstream("batch", cmds=encodebatchcmds(req)) | |
341 chunk = rsp.read(1024) | |
342 work = [chunk] | |
343 while chunk: | |
344 while ';' not in chunk and chunk: | |
345 chunk = rsp.read(1024) | |
346 work.append(chunk) | |
347 merged = ''.join(work) | |
348 while ';' in merged: | |
349 one, merged = merged.split(';', 1) | |
350 yield unescapearg(one) | |
351 chunk = rsp.read(1024) | |
352 work = [merged, chunk] | |
353 yield unescapearg(''.join(work)) | |
354 | |
355 def _submitone(self, op, args): | |
356 return self._call(op, **pycompat.strkwargs(args)) | |
357 | |
358 def debugwireargs(self, one, two, three=None, four=None, five=None): | |
359 # don't pass optional arguments left at their default value | |
360 opts = {} | |
361 if three is not None: | |
362 opts[r'three'] = three | |
363 if four is not None: | |
364 opts[r'four'] = four | |
365 return self._call('debugwireargs', one=one, two=two, **opts) | |
366 | |
367 def _call(self, cmd, **args): | |
368 """execute <cmd> on the server | |
369 | |
370 The command is expected to return a simple string. | |
371 | |
372 returns the server reply as a string.""" | |
373 raise NotImplementedError() | |
374 | |
375 def _callstream(self, cmd, **args): | |
376 """execute <cmd> on the server | |
377 | |
378 The command is expected to return a stream. Note that if the | |
379 command doesn't return a stream, _callstream behaves | |
380 differently for ssh and http peers. | |
381 | |
382 returns the server reply as a file like object. | |
383 """ | |
384 raise NotImplementedError() | |
385 | |
386 def _callcompressable(self, cmd, **args): | |
387 """execute <cmd> on the server | |
388 | |
389 The command is expected to return a stream. | |
390 | |
391 The stream may have been compressed in some implementations. This | |
392 function takes care of the decompression. This is the only difference | |
393 with _callstream. | |
394 | |
395 returns the server reply as a file like object. | |
396 """ | |
397 raise NotImplementedError() | |
398 | |
399 def _callpush(self, cmd, fp, **args): | |
400 """execute a <cmd> on server | |
401 | |
402 The command is expected to be related to a push. Push has a special | |
403 return method. | |
404 | |
405 returns the server reply as a (ret, output) tuple. ret is either | |
406 empty (error) or a stringified int. | |
407 """ | |
408 raise NotImplementedError() | |
409 | |
410 def _calltwowaystream(self, cmd, fp, **args): | |
411 """execute <cmd> on server | |
412 | |
413 The command will send a stream to the server and get a stream in reply. | |
414 """ | |
415 raise NotImplementedError() | |
416 | |
417 def _abort(self, exception): | |
418 """clearly abort the wire protocol connection and raise the exception | |
419 """ | |
420 raise NotImplementedError() |