comparison mercurial/wireprotov1peer.py @ 37630:e1b32dc4646c

wireproto: implement command executor interface for version 1 peers Now that we've defined our new interface for issuing commands, let's implement it. We add the interface to the base peer interface. This means all peer types must implement it. The only peer types that we have are the local peer in localrepo and a shared wire peer for version 1 of the wire protocol. The local peer implementation is pretty straightforward. We don't do anything fancy and just return a resolved future with the result of a method call. This is similar to what localiterbatcher does. The wire protocol version 1 implementation is a bit more complicated and is a more robust implementation. The wire executor queues commands by default. And because the new executor interface always allows multiple commands but not all version 1 commands are @batchable, it has to check that the requested commands are batchable if multiple commands are being requested. The wire executor currently only supports executing a single command. This is for simplicity reasons. Support for multiple commands will be added in a separate commit. To prove the new interface works, a call to the "known" command during discovery has been updated to use the new API. It's worth noting that both implementations require a method having the command name to exist on the peer. There is at least one caller in core that don't have a method calls peer._call() directly. We may need to shore up the requirements later... Differential Revision: https://phab.mercurial-scm.org/D3268
author Gregory Szorc <gregory.szorc@gmail.com>
date Fri, 13 Apr 2018 10:51:23 -0700
parents f3dc8239e3a9
children 2f626233859b
comparison
equal deleted inserted replaced
37629:fa0382088993 37630:e1b32dc4646c
6 # GNU General Public License version 2 or any later version. 6 # GNU General Public License version 2 or any later version.
7 7
8 from __future__ import absolute_import 8 from __future__ import absolute_import
9 9
10 import hashlib 10 import hashlib
11 import sys
11 12
12 from .i18n import _ 13 from .i18n import _
13 from .node import ( 14 from .node import (
14 bin, 15 bin,
15 ) 16 )
16 17 from .thirdparty.zope import (
18 interface as zi,
19 )
17 from . import ( 20 from . import (
18 bundle2, 21 bundle2,
19 changegroup as changegroupmod, 22 changegroup as changegroupmod,
20 encoding, 23 encoding,
21 error, 24 error,
175 for k, v in argsdict.iteritems()) 178 for k, v in argsdict.iteritems())
176 cmds.append('%s %s' % (op, args)) 179 cmds.append('%s %s' % (op, args))
177 180
178 return ';'.join(cmds) 181 return ';'.join(cmds)
179 182
183 @zi.implementer(repository.ipeercommandexecutor)
184 class peerexecutor(object):
185 def __init__(self, peer):
186 self._peer = peer
187 self._sent = False
188 self._closed = False
189 self._calls = []
190
191 def __enter__(self):
192 return self
193
194 def __exit__(self, exctype, excvalee, exctb):
195 self.close()
196
197 def callcommand(self, command, args):
198 if self._sent:
199 raise error.ProgrammingError('callcommand() cannot be used '
200 'after commands are sent')
201
202 if self._closed:
203 raise error.ProgrammingError('callcommand() cannot be used '
204 'after close()')
205
206 # Commands are dispatched through methods on the peer.
207 fn = getattr(self._peer, pycompat.sysstr(command), None)
208
209 if not fn:
210 raise error.ProgrammingError(
211 'cannot call command %s: method of same name not available '
212 'on peer' % command)
213
214 # Commands are either batchable or they aren't. If a command
215 # isn't batchable, we send it immediately because the executor
216 # can no longer accept new commands after a non-batchable command.
217 # If a command is batchable, we queue it for later.
218
219 if getattr(fn, 'batchable', False):
220 pass
221 else:
222 if self._calls:
223 raise error.ProgrammingError(
224 '%s is not batchable and cannot be called on a command '
225 'executor along with other commands' % command)
226
227 # We don't support batching yet. So resolve it immediately.
228 f = pycompat.futures.Future()
229 self._calls.append((command, args, fn, f))
230 self.sendcommands()
231 return f
232
233 def sendcommands(self):
234 if self._sent:
235 return
236
237 if not self._calls:
238 return
239
240 self._sent = True
241
242 calls = self._calls
243 # Mainly to destroy references to futures.
244 self._calls = None
245
246 if len(calls) == 1:
247 command, args, fn, f = calls[0]
248
249 # Future was cancelled. Ignore it.
250 if not f.set_running_or_notify_cancel():
251 return
252
253 try:
254 result = fn(**pycompat.strkwargs(args))
255 except Exception:
256 f.set_exception_info(*sys.exc_info()[1:])
257 else:
258 f.set_result(result)
259
260 return
261
262 raise error.ProgrammingError('support for multiple commands not '
263 'yet implemented')
264
265 def close(self):
266 self.sendcommands()
267
268 self._closed = True
269
180 class wirepeer(repository.legacypeer): 270 class wirepeer(repository.legacypeer):
181 """Client-side interface for communicating with a peer repository. 271 """Client-side interface for communicating with a peer repository.
182 272
183 Methods commonly call wire protocol commands of the same name. 273 Methods commonly call wire protocol commands of the same name.
184 274
185 See also httppeer.py and sshpeer.py for protocol-specific 275 See also httppeer.py and sshpeer.py for protocol-specific
186 implementations of this interface. 276 implementations of this interface.
187 """ 277 """
278 def commandexecutor(self):
279 return peerexecutor(self)
280
188 # Begin of ipeercommands interface. 281 # Begin of ipeercommands interface.
189 282
190 def iterbatch(self): 283 def iterbatch(self):
191 return remoteiterbatcher(self) 284 return remoteiterbatcher(self)
192 285