comparison mercurial/wireprotov1peer.py @ 37614:a81d02ea65db

wireproto: move version 1 peer functionality to standalone module (API) wireproto.py contains code for both the client and the server. There *should* be a somewhat strong separation between the two. This commit extracts the client-side code from wireproto.py into a new module - wireprotov1peer. Differential Revision: https://phab.mercurial-scm.org/D3259
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 11 Apr 2018 12:49:08 -0700
parents mercurial/wireproto.py@96d735601ca1
children f3dc8239e3a9
comparison
equal deleted inserted replaced
37613:96d735601ca1 37614:a81d02ea65db
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 from __future__ import absolute_import
9
10 import hashlib
11
12 from .i18n import _
13 from .node import (
14 bin,
15 )
16
17 from . import (
18 bundle2,
19 changegroup as changegroupmod,
20 encoding,
21 error,
22 peer,
23 pushkey as pushkeymod,
24 pycompat,
25 repository,
26 util,
27 wireprototypes,
28 )
29
30 urlreq = util.urlreq
31
32 class remoteiterbatcher(peer.iterbatcher):
33 def __init__(self, remote):
34 super(remoteiterbatcher, self).__init__()
35 self._remote = remote
36
37 def __getattr__(self, name):
38 # Validate this method is batchable, since submit() only supports
39 # batchable methods.
40 fn = getattr(self._remote, name)
41 if not getattr(fn, 'batchable', None):
42 raise error.ProgrammingError('Attempted to batch a non-batchable '
43 'call to %r' % name)
44
45 return super(remoteiterbatcher, self).__getattr__(name)
46
47 def submit(self):
48 """Break the batch request into many patch calls and pipeline them.
49
50 This is mostly valuable over http where request sizes can be
51 limited, but can be used in other places as well.
52 """
53 # 2-tuple of (command, arguments) that represents what will be
54 # sent over the wire.
55 requests = []
56
57 # 4-tuple of (command, final future, @batchable generator, remote
58 # future).
59 results = []
60
61 for command, args, opts, finalfuture in self.calls:
62 mtd = getattr(self._remote, command)
63 batchable = mtd.batchable(mtd.__self__, *args, **opts)
64
65 commandargs, fremote = next(batchable)
66 assert fremote
67 requests.append((command, commandargs))
68 results.append((command, finalfuture, batchable, fremote))
69
70 if requests:
71 self._resultiter = self._remote._submitbatch(requests)
72
73 self._results = results
74
75 def results(self):
76 for command, finalfuture, batchable, remotefuture in self._results:
77 # Get the raw result, set it in the remote future, feed it
78 # back into the @batchable generator so it can be decoded, and
79 # set the result on the final future to this value.
80 remoteresult = next(self._resultiter)
81 remotefuture.set(remoteresult)
82 finalfuture.set(next(batchable))
83
84 # Verify our @batchable generators only emit 2 values.
85 try:
86 next(batchable)
87 except StopIteration:
88 pass
89 else:
90 raise error.ProgrammingError('%s @batchable generator emitted '
91 'unexpected value count' % command)
92
93 yield finalfuture.value
94
95 # Forward a couple of names from peer to make wireproto interactions
96 # slightly more sensible.
97 batchable = peer.batchable
98 future = peer.future
99
100 def encodebatchcmds(req):
101 """Return a ``cmds`` argument value for the ``batch`` command."""
102 escapearg = wireprototypes.escapebatcharg
103
104 cmds = []
105 for op, argsdict in req:
106 # Old servers didn't properly unescape argument names. So prevent
107 # the sending of argument names that may not be decoded properly by
108 # servers.
109 assert all(escapearg(k) == k for k in argsdict)
110
111 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
112 for k, v in argsdict.iteritems())
113 cmds.append('%s %s' % (op, args))
114
115 return ';'.join(cmds)
116
117 class wirepeer(repository.legacypeer):
118 """Client-side interface for communicating with a peer repository.
119
120 Methods commonly call wire protocol commands of the same name.
121
122 See also httppeer.py and sshpeer.py for protocol-specific
123 implementations of this interface.
124 """
125 # Begin of ipeercommands interface.
126
127 def iterbatch(self):
128 return remoteiterbatcher(self)
129
130 @batchable
131 def lookup(self, key):
132 self.requirecap('lookup', _('look up remote revision'))
133 f = future()
134 yield {'key': encoding.fromlocal(key)}, f
135 d = f.value
136 success, data = d[:-1].split(" ", 1)
137 if int(success):
138 yield bin(data)
139 else:
140 self._abort(error.RepoError(data))
141
142 @batchable
143 def heads(self):
144 f = future()
145 yield {}, f
146 d = f.value
147 try:
148 yield wireprototypes.decodelist(d[:-1])
149 except ValueError:
150 self._abort(error.ResponseError(_("unexpected response:"), d))
151
152 @batchable
153 def known(self, nodes):
154 f = future()
155 yield {'nodes': wireprototypes.encodelist(nodes)}, f
156 d = f.value
157 try:
158 yield [bool(int(b)) for b in d]
159 except ValueError:
160 self._abort(error.ResponseError(_("unexpected response:"), d))
161
162 @batchable
163 def branchmap(self):
164 f = future()
165 yield {}, f
166 d = f.value
167 try:
168 branchmap = {}
169 for branchpart in d.splitlines():
170 branchname, branchheads = branchpart.split(' ', 1)
171 branchname = encoding.tolocal(urlreq.unquote(branchname))
172 branchheads = wireprototypes.decodelist(branchheads)
173 branchmap[branchname] = branchheads
174 yield branchmap
175 except TypeError:
176 self._abort(error.ResponseError(_("unexpected response:"), d))
177
178 @batchable
179 def listkeys(self, namespace):
180 if not self.capable('pushkey'):
181 yield {}, None
182 f = future()
183 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
184 yield {'namespace': encoding.fromlocal(namespace)}, f
185 d = f.value
186 self.ui.debug('received listkey for "%s": %i bytes\n'
187 % (namespace, len(d)))
188 yield pushkeymod.decodekeys(d)
189
190 @batchable
191 def pushkey(self, namespace, key, old, new):
192 if not self.capable('pushkey'):
193 yield False, None
194 f = future()
195 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
196 yield {'namespace': encoding.fromlocal(namespace),
197 'key': encoding.fromlocal(key),
198 'old': encoding.fromlocal(old),
199 'new': encoding.fromlocal(new)}, f
200 d = f.value
201 d, output = d.split('\n', 1)
202 try:
203 d = bool(int(d))
204 except ValueError:
205 raise error.ResponseError(
206 _('push failed (unexpected response):'), d)
207 for l in output.splitlines(True):
208 self.ui.status(_('remote: '), l)
209 yield d
210
211 def stream_out(self):
212 return self._callstream('stream_out')
213
214 def getbundle(self, source, **kwargs):
215 kwargs = pycompat.byteskwargs(kwargs)
216 self.requirecap('getbundle', _('look up remote changes'))
217 opts = {}
218 bundlecaps = kwargs.get('bundlecaps') or set()
219 for key, value in kwargs.iteritems():
220 if value is None:
221 continue
222 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
223 if keytype is None:
224 raise error.ProgrammingError(
225 'Unexpectedly None keytype for key %s' % key)
226 elif keytype == 'nodes':
227 value = wireprototypes.encodelist(value)
228 elif keytype == 'csv':
229 value = ','.join(value)
230 elif keytype == 'scsv':
231 value = ','.join(sorted(value))
232 elif keytype == 'boolean':
233 value = '%i' % bool(value)
234 elif keytype != 'plain':
235 raise KeyError('unknown getbundle option type %s'
236 % keytype)
237 opts[key] = value
238 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
239 if any((cap.startswith('HG2') for cap in bundlecaps)):
240 return bundle2.getunbundler(self.ui, f)
241 else:
242 return changegroupmod.cg1unpacker(f, 'UN')
243
244 def unbundle(self, cg, heads, url):
245 '''Send cg (a readable file-like object representing the
246 changegroup to push, typically a chunkbuffer object) to the
247 remote server as a bundle.
248
249 When pushing a bundle10 stream, return an integer indicating the
250 result of the push (see changegroup.apply()).
251
252 When pushing a bundle20 stream, return a bundle20 stream.
253
254 `url` is the url the client thinks it's pushing to, which is
255 visible to hooks.
256 '''
257
258 if heads != ['force'] and self.capable('unbundlehash'):
259 heads = wireprototypes.encodelist(
260 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
261 else:
262 heads = wireprototypes.encodelist(heads)
263
264 if util.safehasattr(cg, 'deltaheader'):
265 # this a bundle10, do the old style call sequence
266 ret, output = self._callpush("unbundle", cg, heads=heads)
267 if ret == "":
268 raise error.ResponseError(
269 _('push failed:'), output)
270 try:
271 ret = int(ret)
272 except ValueError:
273 raise error.ResponseError(
274 _('push failed (unexpected response):'), ret)
275
276 for l in output.splitlines(True):
277 self.ui.status(_('remote: '), l)
278 else:
279 # bundle2 push. Send a stream, fetch a stream.
280 stream = self._calltwowaystream('unbundle', cg, heads=heads)
281 ret = bundle2.getunbundler(self.ui, stream)
282 return ret
283
284 # End of ipeercommands interface.
285
286 # Begin of ipeerlegacycommands interface.
287
288 def branches(self, nodes):
289 n = wireprototypes.encodelist(nodes)
290 d = self._call("branches", nodes=n)
291 try:
292 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
293 return br
294 except ValueError:
295 self._abort(error.ResponseError(_("unexpected response:"), d))
296
297 def between(self, pairs):
298 batch = 8 # avoid giant requests
299 r = []
300 for i in xrange(0, len(pairs), batch):
301 n = " ".join([wireprototypes.encodelist(p, '-')
302 for p in pairs[i:i + batch]])
303 d = self._call("between", pairs=n)
304 try:
305 r.extend(l and wireprototypes.decodelist(l) or []
306 for l in d.splitlines())
307 except ValueError:
308 self._abort(error.ResponseError(_("unexpected response:"), d))
309 return r
310
311 def changegroup(self, nodes, kind):
312 n = wireprototypes.encodelist(nodes)
313 f = self._callcompressable("changegroup", roots=n)
314 return changegroupmod.cg1unpacker(f, 'UN')
315
316 def changegroupsubset(self, bases, heads, kind):
317 self.requirecap('changegroupsubset', _('look up remote changes'))
318 bases = wireprototypes.encodelist(bases)
319 heads = wireprototypes.encodelist(heads)
320 f = self._callcompressable("changegroupsubset",
321 bases=bases, heads=heads)
322 return changegroupmod.cg1unpacker(f, 'UN')
323
324 # End of ipeerlegacycommands interface.
325
326 def _submitbatch(self, req):
327 """run batch request <req> on the server
328
329 Returns an iterator of the raw responses from the server.
330 """
331 ui = self.ui
332 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
333 ui.debug('devel-peer-request: batched-content\n')
334 for op, args in req:
335 msg = 'devel-peer-request: - %s (%d arguments)\n'
336 ui.debug(msg % (op, len(args)))
337
338 unescapearg = wireprototypes.unescapebatcharg
339
340 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
341 chunk = rsp.read(1024)
342 work = [chunk]
343 while chunk:
344 while ';' not in chunk and chunk:
345 chunk = rsp.read(1024)
346 work.append(chunk)
347 merged = ''.join(work)
348 while ';' in merged:
349 one, merged = merged.split(';', 1)
350 yield unescapearg(one)
351 chunk = rsp.read(1024)
352 work = [merged, chunk]
353 yield unescapearg(''.join(work))
354
355 def _submitone(self, op, args):
356 return self._call(op, **pycompat.strkwargs(args))
357
358 def debugwireargs(self, one, two, three=None, four=None, five=None):
359 # don't pass optional arguments left at their default value
360 opts = {}
361 if three is not None:
362 opts[r'three'] = three
363 if four is not None:
364 opts[r'four'] = four
365 return self._call('debugwireargs', one=one, two=two, **opts)
366
367 def _call(self, cmd, **args):
368 """execute <cmd> on the server
369
370 The command is expected to return a simple string.
371
372 returns the server reply as a string."""
373 raise NotImplementedError()
374
375 def _callstream(self, cmd, **args):
376 """execute <cmd> on the server
377
378 The command is expected to return a stream. Note that if the
379 command doesn't return a stream, _callstream behaves
380 differently for ssh and http peers.
381
382 returns the server reply as a file like object.
383 """
384 raise NotImplementedError()
385
386 def _callcompressable(self, cmd, **args):
387 """execute <cmd> on the server
388
389 The command is expected to return a stream.
390
391 The stream may have been compressed in some implementations. This
392 function takes care of the decompression. This is the only difference
393 with _callstream.
394
395 returns the server reply as a file like object.
396 """
397 raise NotImplementedError()
398
399 def _callpush(self, cmd, fp, **args):
400 """execute a <cmd> on server
401
402 The command is expected to be related to a push. Push has a special
403 return method.
404
405 returns the server reply as a (ret, output) tuple. ret is either
406 empty (error) or a stringified int.
407 """
408 raise NotImplementedError()
409
410 def _calltwowaystream(self, cmd, fp, **args):
411 """execute <cmd> on server
412
413 The command will send a stream to the server and get a stream in reply.
414 """
415 raise NotImplementedError()
416
417 def _abort(self, exception):
418 """clearly abort the wire protocol connection and raise the exception
419 """
420 raise NotImplementedError()