Mercurial > hg
comparison mercurial/wireprotov1peer.py @ 37630:e1b32dc4646c
wireproto: implement command executor interface for version 1 peers
Now that we've defined our new interface for issuing commands,
let's implement it.
We add the interface to the base peer interface. This means all
peer types must implement it.
The only peer types that we have are the local peer in localrepo
and a shared wire peer for version 1 of the wire protocol.
The local peer implementation is pretty straightforward. We
don't do anything fancy and just return a resolved future with
the result of a method call. This is similar to what
localiterbatcher does.
The wire protocol version 1 implementation is a bit more complicated
and is a more robust implementation.
The wire executor queues commands by default. And because the new
executor interface always allows multiple commands but not all version
1 commands are @batchable, it has to check that the requested commands
are batchable if multiple commands are being requested.
The wire executor currently only supports executing a single command.
This is for simplicity reasons. Support for multiple commands will
be added in a separate commit.
To prove the new interface works, a call to the "known" command
during discovery has been updated to use the new API.
It's worth noting that both implementations require a method having
the command name to exist on the peer. There is at least one caller
in core that don't have a method calls peer._call() directly. We
may need to shore up the requirements later...
Differential Revision: https://phab.mercurial-scm.org/D3268
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Fri, 13 Apr 2018 10:51:23 -0700 |
parents | f3dc8239e3a9 |
children | 2f626233859b |
comparison
equal
deleted
inserted
replaced
37629:fa0382088993 | 37630:e1b32dc4646c |
---|---|
6 # GNU General Public License version 2 or any later version. | 6 # GNU General Public License version 2 or any later version. |
7 | 7 |
8 from __future__ import absolute_import | 8 from __future__ import absolute_import |
9 | 9 |
10 import hashlib | 10 import hashlib |
11 import sys | |
11 | 12 |
12 from .i18n import _ | 13 from .i18n import _ |
13 from .node import ( | 14 from .node import ( |
14 bin, | 15 bin, |
15 ) | 16 ) |
16 | 17 from .thirdparty.zope import ( |
18 interface as zi, | |
19 ) | |
17 from . import ( | 20 from . import ( |
18 bundle2, | 21 bundle2, |
19 changegroup as changegroupmod, | 22 changegroup as changegroupmod, |
20 encoding, | 23 encoding, |
21 error, | 24 error, |
175 for k, v in argsdict.iteritems()) | 178 for k, v in argsdict.iteritems()) |
176 cmds.append('%s %s' % (op, args)) | 179 cmds.append('%s %s' % (op, args)) |
177 | 180 |
178 return ';'.join(cmds) | 181 return ';'.join(cmds) |
179 | 182 |
183 @zi.implementer(repository.ipeercommandexecutor) | |
184 class peerexecutor(object): | |
185 def __init__(self, peer): | |
186 self._peer = peer | |
187 self._sent = False | |
188 self._closed = False | |
189 self._calls = [] | |
190 | |
191 def __enter__(self): | |
192 return self | |
193 | |
194 def __exit__(self, exctype, excvalee, exctb): | |
195 self.close() | |
196 | |
197 def callcommand(self, command, args): | |
198 if self._sent: | |
199 raise error.ProgrammingError('callcommand() cannot be used ' | |
200 'after commands are sent') | |
201 | |
202 if self._closed: | |
203 raise error.ProgrammingError('callcommand() cannot be used ' | |
204 'after close()') | |
205 | |
206 # Commands are dispatched through methods on the peer. | |
207 fn = getattr(self._peer, pycompat.sysstr(command), None) | |
208 | |
209 if not fn: | |
210 raise error.ProgrammingError( | |
211 'cannot call command %s: method of same name not available ' | |
212 'on peer' % command) | |
213 | |
214 # Commands are either batchable or they aren't. If a command | |
215 # isn't batchable, we send it immediately because the executor | |
216 # can no longer accept new commands after a non-batchable command. | |
217 # If a command is batchable, we queue it for later. | |
218 | |
219 if getattr(fn, 'batchable', False): | |
220 pass | |
221 else: | |
222 if self._calls: | |
223 raise error.ProgrammingError( | |
224 '%s is not batchable and cannot be called on a command ' | |
225 'executor along with other commands' % command) | |
226 | |
227 # We don't support batching yet. So resolve it immediately. | |
228 f = pycompat.futures.Future() | |
229 self._calls.append((command, args, fn, f)) | |
230 self.sendcommands() | |
231 return f | |
232 | |
233 def sendcommands(self): | |
234 if self._sent: | |
235 return | |
236 | |
237 if not self._calls: | |
238 return | |
239 | |
240 self._sent = True | |
241 | |
242 calls = self._calls | |
243 # Mainly to destroy references to futures. | |
244 self._calls = None | |
245 | |
246 if len(calls) == 1: | |
247 command, args, fn, f = calls[0] | |
248 | |
249 # Future was cancelled. Ignore it. | |
250 if not f.set_running_or_notify_cancel(): | |
251 return | |
252 | |
253 try: | |
254 result = fn(**pycompat.strkwargs(args)) | |
255 except Exception: | |
256 f.set_exception_info(*sys.exc_info()[1:]) | |
257 else: | |
258 f.set_result(result) | |
259 | |
260 return | |
261 | |
262 raise error.ProgrammingError('support for multiple commands not ' | |
263 'yet implemented') | |
264 | |
265 def close(self): | |
266 self.sendcommands() | |
267 | |
268 self._closed = True | |
269 | |
180 class wirepeer(repository.legacypeer): | 270 class wirepeer(repository.legacypeer): |
181 """Client-side interface for communicating with a peer repository. | 271 """Client-side interface for communicating with a peer repository. |
182 | 272 |
183 Methods commonly call wire protocol commands of the same name. | 273 Methods commonly call wire protocol commands of the same name. |
184 | 274 |
185 See also httppeer.py and sshpeer.py for protocol-specific | 275 See also httppeer.py and sshpeer.py for protocol-specific |
186 implementations of this interface. | 276 implementations of this interface. |
187 """ | 277 """ |
278 def commandexecutor(self): | |
279 return peerexecutor(self) | |
280 | |
188 # Begin of ipeercommands interface. | 281 # Begin of ipeercommands interface. |
189 | 282 |
190 def iterbatch(self): | 283 def iterbatch(self): |
191 return remoteiterbatcher(self) | 284 return remoteiterbatcher(self) |
192 | 285 |