|
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1. |
|
2 # |
|
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com> |
|
4 # |
|
5 # This software may be used and distributed according to the terms of the |
|
6 # GNU General Public License version 2 or any later version. |
|
7 |
|
8 from __future__ import absolute_import |
|
9 |
|
10 import hashlib |
|
11 |
|
12 from .i18n import _ |
|
13 from .node import ( |
|
14 bin, |
|
15 ) |
|
16 |
|
17 from . import ( |
|
18 bundle2, |
|
19 changegroup as changegroupmod, |
|
20 encoding, |
|
21 error, |
|
22 peer, |
|
23 pushkey as pushkeymod, |
|
24 pycompat, |
|
25 repository, |
|
26 util, |
|
27 wireprototypes, |
|
28 ) |
|
29 |
|
30 urlreq = util.urlreq |
|
31 |
|
32 class remoteiterbatcher(peer.iterbatcher): |
|
33 def __init__(self, remote): |
|
34 super(remoteiterbatcher, self).__init__() |
|
35 self._remote = remote |
|
36 |
|
37 def __getattr__(self, name): |
|
38 # Validate this method is batchable, since submit() only supports |
|
39 # batchable methods. |
|
40 fn = getattr(self._remote, name) |
|
41 if not getattr(fn, 'batchable', None): |
|
42 raise error.ProgrammingError('Attempted to batch a non-batchable ' |
|
43 'call to %r' % name) |
|
44 |
|
45 return super(remoteiterbatcher, self).__getattr__(name) |
|
46 |
|
47 def submit(self): |
|
48 """Break the batch request into many patch calls and pipeline them. |
|
49 |
|
50 This is mostly valuable over http where request sizes can be |
|
51 limited, but can be used in other places as well. |
|
52 """ |
|
53 # 2-tuple of (command, arguments) that represents what will be |
|
54 # sent over the wire. |
|
55 requests = [] |
|
56 |
|
57 # 4-tuple of (command, final future, @batchable generator, remote |
|
58 # future). |
|
59 results = [] |
|
60 |
|
61 for command, args, opts, finalfuture in self.calls: |
|
62 mtd = getattr(self._remote, command) |
|
63 batchable = mtd.batchable(mtd.__self__, *args, **opts) |
|
64 |
|
65 commandargs, fremote = next(batchable) |
|
66 assert fremote |
|
67 requests.append((command, commandargs)) |
|
68 results.append((command, finalfuture, batchable, fremote)) |
|
69 |
|
70 if requests: |
|
71 self._resultiter = self._remote._submitbatch(requests) |
|
72 |
|
73 self._results = results |
|
74 |
|
75 def results(self): |
|
76 for command, finalfuture, batchable, remotefuture in self._results: |
|
77 # Get the raw result, set it in the remote future, feed it |
|
78 # back into the @batchable generator so it can be decoded, and |
|
79 # set the result on the final future to this value. |
|
80 remoteresult = next(self._resultiter) |
|
81 remotefuture.set(remoteresult) |
|
82 finalfuture.set(next(batchable)) |
|
83 |
|
84 # Verify our @batchable generators only emit 2 values. |
|
85 try: |
|
86 next(batchable) |
|
87 except StopIteration: |
|
88 pass |
|
89 else: |
|
90 raise error.ProgrammingError('%s @batchable generator emitted ' |
|
91 'unexpected value count' % command) |
|
92 |
|
93 yield finalfuture.value |
|
94 |
|
95 # Forward a couple of names from peer to make wireproto interactions |
|
96 # slightly more sensible. |
|
97 batchable = peer.batchable |
|
98 future = peer.future |
|
99 |
|
100 def encodebatchcmds(req): |
|
101 """Return a ``cmds`` argument value for the ``batch`` command.""" |
|
102 escapearg = wireprototypes.escapebatcharg |
|
103 |
|
104 cmds = [] |
|
105 for op, argsdict in req: |
|
106 # Old servers didn't properly unescape argument names. So prevent |
|
107 # the sending of argument names that may not be decoded properly by |
|
108 # servers. |
|
109 assert all(escapearg(k) == k for k in argsdict) |
|
110 |
|
111 args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) |
|
112 for k, v in argsdict.iteritems()) |
|
113 cmds.append('%s %s' % (op, args)) |
|
114 |
|
115 return ';'.join(cmds) |
|
116 |
|
117 class wirepeer(repository.legacypeer): |
|
118 """Client-side interface for communicating with a peer repository. |
|
119 |
|
120 Methods commonly call wire protocol commands of the same name. |
|
121 |
|
122 See also httppeer.py and sshpeer.py for protocol-specific |
|
123 implementations of this interface. |
|
124 """ |
|
125 # Begin of ipeercommands interface. |
|
126 |
|
127 def iterbatch(self): |
|
128 return remoteiterbatcher(self) |
|
129 |
|
130 @batchable |
|
131 def lookup(self, key): |
|
132 self.requirecap('lookup', _('look up remote revision')) |
|
133 f = future() |
|
134 yield {'key': encoding.fromlocal(key)}, f |
|
135 d = f.value |
|
136 success, data = d[:-1].split(" ", 1) |
|
137 if int(success): |
|
138 yield bin(data) |
|
139 else: |
|
140 self._abort(error.RepoError(data)) |
|
141 |
|
142 @batchable |
|
143 def heads(self): |
|
144 f = future() |
|
145 yield {}, f |
|
146 d = f.value |
|
147 try: |
|
148 yield wireprototypes.decodelist(d[:-1]) |
|
149 except ValueError: |
|
150 self._abort(error.ResponseError(_("unexpected response:"), d)) |
|
151 |
|
152 @batchable |
|
153 def known(self, nodes): |
|
154 f = future() |
|
155 yield {'nodes': wireprototypes.encodelist(nodes)}, f |
|
156 d = f.value |
|
157 try: |
|
158 yield [bool(int(b)) for b in d] |
|
159 except ValueError: |
|
160 self._abort(error.ResponseError(_("unexpected response:"), d)) |
|
161 |
|
162 @batchable |
|
163 def branchmap(self): |
|
164 f = future() |
|
165 yield {}, f |
|
166 d = f.value |
|
167 try: |
|
168 branchmap = {} |
|
169 for branchpart in d.splitlines(): |
|
170 branchname, branchheads = branchpart.split(' ', 1) |
|
171 branchname = encoding.tolocal(urlreq.unquote(branchname)) |
|
172 branchheads = wireprototypes.decodelist(branchheads) |
|
173 branchmap[branchname] = branchheads |
|
174 yield branchmap |
|
175 except TypeError: |
|
176 self._abort(error.ResponseError(_("unexpected response:"), d)) |
|
177 |
|
178 @batchable |
|
179 def listkeys(self, namespace): |
|
180 if not self.capable('pushkey'): |
|
181 yield {}, None |
|
182 f = future() |
|
183 self.ui.debug('preparing listkeys for "%s"\n' % namespace) |
|
184 yield {'namespace': encoding.fromlocal(namespace)}, f |
|
185 d = f.value |
|
186 self.ui.debug('received listkey for "%s": %i bytes\n' |
|
187 % (namespace, len(d))) |
|
188 yield pushkeymod.decodekeys(d) |
|
189 |
|
190 @batchable |
|
191 def pushkey(self, namespace, key, old, new): |
|
192 if not self.capable('pushkey'): |
|
193 yield False, None |
|
194 f = future() |
|
195 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key)) |
|
196 yield {'namespace': encoding.fromlocal(namespace), |
|
197 'key': encoding.fromlocal(key), |
|
198 'old': encoding.fromlocal(old), |
|
199 'new': encoding.fromlocal(new)}, f |
|
200 d = f.value |
|
201 d, output = d.split('\n', 1) |
|
202 try: |
|
203 d = bool(int(d)) |
|
204 except ValueError: |
|
205 raise error.ResponseError( |
|
206 _('push failed (unexpected response):'), d) |
|
207 for l in output.splitlines(True): |
|
208 self.ui.status(_('remote: '), l) |
|
209 yield d |
|
210 |
|
211 def stream_out(self): |
|
212 return self._callstream('stream_out') |
|
213 |
|
214 def getbundle(self, source, **kwargs): |
|
215 kwargs = pycompat.byteskwargs(kwargs) |
|
216 self.requirecap('getbundle', _('look up remote changes')) |
|
217 opts = {} |
|
218 bundlecaps = kwargs.get('bundlecaps') or set() |
|
219 for key, value in kwargs.iteritems(): |
|
220 if value is None: |
|
221 continue |
|
222 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) |
|
223 if keytype is None: |
|
224 raise error.ProgrammingError( |
|
225 'Unexpectedly None keytype for key %s' % key) |
|
226 elif keytype == 'nodes': |
|
227 value = wireprototypes.encodelist(value) |
|
228 elif keytype == 'csv': |
|
229 value = ','.join(value) |
|
230 elif keytype == 'scsv': |
|
231 value = ','.join(sorted(value)) |
|
232 elif keytype == 'boolean': |
|
233 value = '%i' % bool(value) |
|
234 elif keytype != 'plain': |
|
235 raise KeyError('unknown getbundle option type %s' |
|
236 % keytype) |
|
237 opts[key] = value |
|
238 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts)) |
|
239 if any((cap.startswith('HG2') for cap in bundlecaps)): |
|
240 return bundle2.getunbundler(self.ui, f) |
|
241 else: |
|
242 return changegroupmod.cg1unpacker(f, 'UN') |
|
243 |
|
244 def unbundle(self, cg, heads, url): |
|
245 '''Send cg (a readable file-like object representing the |
|
246 changegroup to push, typically a chunkbuffer object) to the |
|
247 remote server as a bundle. |
|
248 |
|
249 When pushing a bundle10 stream, return an integer indicating the |
|
250 result of the push (see changegroup.apply()). |
|
251 |
|
252 When pushing a bundle20 stream, return a bundle20 stream. |
|
253 |
|
254 `url` is the url the client thinks it's pushing to, which is |
|
255 visible to hooks. |
|
256 ''' |
|
257 |
|
258 if heads != ['force'] and self.capable('unbundlehash'): |
|
259 heads = wireprototypes.encodelist( |
|
260 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) |
|
261 else: |
|
262 heads = wireprototypes.encodelist(heads) |
|
263 |
|
264 if util.safehasattr(cg, 'deltaheader'): |
|
265 # this a bundle10, do the old style call sequence |
|
266 ret, output = self._callpush("unbundle", cg, heads=heads) |
|
267 if ret == "": |
|
268 raise error.ResponseError( |
|
269 _('push failed:'), output) |
|
270 try: |
|
271 ret = int(ret) |
|
272 except ValueError: |
|
273 raise error.ResponseError( |
|
274 _('push failed (unexpected response):'), ret) |
|
275 |
|
276 for l in output.splitlines(True): |
|
277 self.ui.status(_('remote: '), l) |
|
278 else: |
|
279 # bundle2 push. Send a stream, fetch a stream. |
|
280 stream = self._calltwowaystream('unbundle', cg, heads=heads) |
|
281 ret = bundle2.getunbundler(self.ui, stream) |
|
282 return ret |
|
283 |
|
284 # End of ipeercommands interface. |
|
285 |
|
286 # Begin of ipeerlegacycommands interface. |
|
287 |
|
288 def branches(self, nodes): |
|
289 n = wireprototypes.encodelist(nodes) |
|
290 d = self._call("branches", nodes=n) |
|
291 try: |
|
292 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] |
|
293 return br |
|
294 except ValueError: |
|
295 self._abort(error.ResponseError(_("unexpected response:"), d)) |
|
296 |
|
297 def between(self, pairs): |
|
298 batch = 8 # avoid giant requests |
|
299 r = [] |
|
300 for i in xrange(0, len(pairs), batch): |
|
301 n = " ".join([wireprototypes.encodelist(p, '-') |
|
302 for p in pairs[i:i + batch]]) |
|
303 d = self._call("between", pairs=n) |
|
304 try: |
|
305 r.extend(l and wireprototypes.decodelist(l) or [] |
|
306 for l in d.splitlines()) |
|
307 except ValueError: |
|
308 self._abort(error.ResponseError(_("unexpected response:"), d)) |
|
309 return r |
|
310 |
|
311 def changegroup(self, nodes, kind): |
|
312 n = wireprototypes.encodelist(nodes) |
|
313 f = self._callcompressable("changegroup", roots=n) |
|
314 return changegroupmod.cg1unpacker(f, 'UN') |
|
315 |
|
316 def changegroupsubset(self, bases, heads, kind): |
|
317 self.requirecap('changegroupsubset', _('look up remote changes')) |
|
318 bases = wireprototypes.encodelist(bases) |
|
319 heads = wireprototypes.encodelist(heads) |
|
320 f = self._callcompressable("changegroupsubset", |
|
321 bases=bases, heads=heads) |
|
322 return changegroupmod.cg1unpacker(f, 'UN') |
|
323 |
|
324 # End of ipeerlegacycommands interface. |
|
325 |
|
326 def _submitbatch(self, req): |
|
327 """run batch request <req> on the server |
|
328 |
|
329 Returns an iterator of the raw responses from the server. |
|
330 """ |
|
331 ui = self.ui |
|
332 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): |
|
333 ui.debug('devel-peer-request: batched-content\n') |
|
334 for op, args in req: |
|
335 msg = 'devel-peer-request: - %s (%d arguments)\n' |
|
336 ui.debug(msg % (op, len(args))) |
|
337 |
|
338 unescapearg = wireprototypes.unescapebatcharg |
|
339 |
|
340 rsp = self._callstream("batch", cmds=encodebatchcmds(req)) |
|
341 chunk = rsp.read(1024) |
|
342 work = [chunk] |
|
343 while chunk: |
|
344 while ';' not in chunk and chunk: |
|
345 chunk = rsp.read(1024) |
|
346 work.append(chunk) |
|
347 merged = ''.join(work) |
|
348 while ';' in merged: |
|
349 one, merged = merged.split(';', 1) |
|
350 yield unescapearg(one) |
|
351 chunk = rsp.read(1024) |
|
352 work = [merged, chunk] |
|
353 yield unescapearg(''.join(work)) |
|
354 |
|
355 def _submitone(self, op, args): |
|
356 return self._call(op, **pycompat.strkwargs(args)) |
|
357 |
|
358 def debugwireargs(self, one, two, three=None, four=None, five=None): |
|
359 # don't pass optional arguments left at their default value |
|
360 opts = {} |
|
361 if three is not None: |
|
362 opts[r'three'] = three |
|
363 if four is not None: |
|
364 opts[r'four'] = four |
|
365 return self._call('debugwireargs', one=one, two=two, **opts) |
|
366 |
|
367 def _call(self, cmd, **args): |
|
368 """execute <cmd> on the server |
|
369 |
|
370 The command is expected to return a simple string. |
|
371 |
|
372 returns the server reply as a string.""" |
|
373 raise NotImplementedError() |
|
374 |
|
375 def _callstream(self, cmd, **args): |
|
376 """execute <cmd> on the server |
|
377 |
|
378 The command is expected to return a stream. Note that if the |
|
379 command doesn't return a stream, _callstream behaves |
|
380 differently for ssh and http peers. |
|
381 |
|
382 returns the server reply as a file like object. |
|
383 """ |
|
384 raise NotImplementedError() |
|
385 |
|
386 def _callcompressable(self, cmd, **args): |
|
387 """execute <cmd> on the server |
|
388 |
|
389 The command is expected to return a stream. |
|
390 |
|
391 The stream may have been compressed in some implementations. This |
|
392 function takes care of the decompression. This is the only difference |
|
393 with _callstream. |
|
394 |
|
395 returns the server reply as a file like object. |
|
396 """ |
|
397 raise NotImplementedError() |
|
398 |
|
399 def _callpush(self, cmd, fp, **args): |
|
400 """execute a <cmd> on server |
|
401 |
|
402 The command is expected to be related to a push. Push has a special |
|
403 return method. |
|
404 |
|
405 returns the server reply as a (ret, output) tuple. ret is either |
|
406 empty (error) or a stringified int. |
|
407 """ |
|
408 raise NotImplementedError() |
|
409 |
|
410 def _calltwowaystream(self, cmd, fp, **args): |
|
411 """execute <cmd> on server |
|
412 |
|
413 The command will send a stream to the server and get a stream in reply. |
|
414 """ |
|
415 raise NotImplementedError() |
|
416 |
|
417 def _abort(self, exception): |
|
418 """clearly abort the wire protocol connection and raise the exception |
|
419 """ |
|
420 raise NotImplementedError() |