mercurial/wireprotov1peer.py
changeset 37614 a81d02ea65db
parent 37613 96d735601ca1
child 37615 f3dc8239e3a9
equal deleted inserted replaced
37613:96d735601ca1 37614:a81d02ea65db
       
     1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
       
     2 #
       
     3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
       
     4 #
       
     5 # This software may be used and distributed according to the terms of the
       
     6 # GNU General Public License version 2 or any later version.
       
     7 
       
     8 from __future__ import absolute_import
       
     9 
       
    10 import hashlib
       
    11 
       
    12 from .i18n import _
       
    13 from .node import (
       
    14     bin,
       
    15 )
       
    16 
       
    17 from . import (
       
    18     bundle2,
       
    19     changegroup as changegroupmod,
       
    20     encoding,
       
    21     error,
       
    22     peer,
       
    23     pushkey as pushkeymod,
       
    24     pycompat,
       
    25     repository,
       
    26     util,
       
    27     wireprototypes,
       
    28 )
       
    29 
       
    30 urlreq = util.urlreq
       
    31 
       
    32 class remoteiterbatcher(peer.iterbatcher):
       
    33     def __init__(self, remote):
       
    34         super(remoteiterbatcher, self).__init__()
       
    35         self._remote = remote
       
    36 
       
    37     def __getattr__(self, name):
       
    38         # Validate this method is batchable, since submit() only supports
       
    39         # batchable methods.
       
    40         fn = getattr(self._remote, name)
       
    41         if not getattr(fn, 'batchable', None):
       
    42             raise error.ProgrammingError('Attempted to batch a non-batchable '
       
    43                                          'call to %r' % name)
       
    44 
       
    45         return super(remoteiterbatcher, self).__getattr__(name)
       
    46 
       
    47     def submit(self):
       
    48         """Break the batch request into many patch calls and pipeline them.
       
    49 
       
    50         This is mostly valuable over http where request sizes can be
       
    51         limited, but can be used in other places as well.
       
    52         """
       
    53         # 2-tuple of (command, arguments) that represents what will be
       
    54         # sent over the wire.
       
    55         requests = []
       
    56 
       
    57         # 4-tuple of (command, final future, @batchable generator, remote
       
    58         # future).
       
    59         results = []
       
    60 
       
    61         for command, args, opts, finalfuture in self.calls:
       
    62             mtd = getattr(self._remote, command)
       
    63             batchable = mtd.batchable(mtd.__self__, *args, **opts)
       
    64 
       
    65             commandargs, fremote = next(batchable)
       
    66             assert fremote
       
    67             requests.append((command, commandargs))
       
    68             results.append((command, finalfuture, batchable, fremote))
       
    69 
       
    70         if requests:
       
    71             self._resultiter = self._remote._submitbatch(requests)
       
    72 
       
    73         self._results = results
       
    74 
       
    75     def results(self):
       
    76         for command, finalfuture, batchable, remotefuture in self._results:
       
    77             # Get the raw result, set it in the remote future, feed it
       
    78             # back into the @batchable generator so it can be decoded, and
       
    79             # set the result on the final future to this value.
       
    80             remoteresult = next(self._resultiter)
       
    81             remotefuture.set(remoteresult)
       
    82             finalfuture.set(next(batchable))
       
    83 
       
    84             # Verify our @batchable generators only emit 2 values.
       
    85             try:
       
    86                 next(batchable)
       
    87             except StopIteration:
       
    88                 pass
       
    89             else:
       
    90                 raise error.ProgrammingError('%s @batchable generator emitted '
       
    91                                              'unexpected value count' % command)
       
    92 
       
    93             yield finalfuture.value
       
    94 
       
    95 # Forward a couple of names from peer to make wireproto interactions
       
    96 # slightly more sensible.
       
    97 batchable = peer.batchable
       
    98 future = peer.future
       
    99 
       
   100 def encodebatchcmds(req):
       
   101     """Return a ``cmds`` argument value for the ``batch`` command."""
       
   102     escapearg = wireprototypes.escapebatcharg
       
   103 
       
   104     cmds = []
       
   105     for op, argsdict in req:
       
   106         # Old servers didn't properly unescape argument names. So prevent
       
   107         # the sending of argument names that may not be decoded properly by
       
   108         # servers.
       
   109         assert all(escapearg(k) == k for k in argsdict)
       
   110 
       
   111         args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
       
   112                         for k, v in argsdict.iteritems())
       
   113         cmds.append('%s %s' % (op, args))
       
   114 
       
   115     return ';'.join(cmds)
       
   116 
       
   117 class wirepeer(repository.legacypeer):
       
   118     """Client-side interface for communicating with a peer repository.
       
   119 
       
   120     Methods commonly call wire protocol commands of the same name.
       
   121 
       
   122     See also httppeer.py and sshpeer.py for protocol-specific
       
   123     implementations of this interface.
       
   124     """
       
   125     # Begin of ipeercommands interface.
       
   126 
       
   127     def iterbatch(self):
       
   128         return remoteiterbatcher(self)
       
   129 
       
   130     @batchable
       
   131     def lookup(self, key):
       
   132         self.requirecap('lookup', _('look up remote revision'))
       
   133         f = future()
       
   134         yield {'key': encoding.fromlocal(key)}, f
       
   135         d = f.value
       
   136         success, data = d[:-1].split(" ", 1)
       
   137         if int(success):
       
   138             yield bin(data)
       
   139         else:
       
   140             self._abort(error.RepoError(data))
       
   141 
       
   142     @batchable
       
   143     def heads(self):
       
   144         f = future()
       
   145         yield {}, f
       
   146         d = f.value
       
   147         try:
       
   148             yield wireprototypes.decodelist(d[:-1])
       
   149         except ValueError:
       
   150             self._abort(error.ResponseError(_("unexpected response:"), d))
       
   151 
       
   152     @batchable
       
   153     def known(self, nodes):
       
   154         f = future()
       
   155         yield {'nodes': wireprototypes.encodelist(nodes)}, f
       
   156         d = f.value
       
   157         try:
       
   158             yield [bool(int(b)) for b in d]
       
   159         except ValueError:
       
   160             self._abort(error.ResponseError(_("unexpected response:"), d))
       
   161 
       
   162     @batchable
       
   163     def branchmap(self):
       
   164         f = future()
       
   165         yield {}, f
       
   166         d = f.value
       
   167         try:
       
   168             branchmap = {}
       
   169             for branchpart in d.splitlines():
       
   170                 branchname, branchheads = branchpart.split(' ', 1)
       
   171                 branchname = encoding.tolocal(urlreq.unquote(branchname))
       
   172                 branchheads = wireprototypes.decodelist(branchheads)
       
   173                 branchmap[branchname] = branchheads
       
   174             yield branchmap
       
   175         except TypeError:
       
   176             self._abort(error.ResponseError(_("unexpected response:"), d))
       
   177 
       
   178     @batchable
       
   179     def listkeys(self, namespace):
       
   180         if not self.capable('pushkey'):
       
   181             yield {}, None
       
   182         f = future()
       
   183         self.ui.debug('preparing listkeys for "%s"\n' % namespace)
       
   184         yield {'namespace': encoding.fromlocal(namespace)}, f
       
   185         d = f.value
       
   186         self.ui.debug('received listkey for "%s": %i bytes\n'
       
   187                       % (namespace, len(d)))
       
   188         yield pushkeymod.decodekeys(d)
       
   189 
       
   190     @batchable
       
   191     def pushkey(self, namespace, key, old, new):
       
   192         if not self.capable('pushkey'):
       
   193             yield False, None
       
   194         f = future()
       
   195         self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
       
   196         yield {'namespace': encoding.fromlocal(namespace),
       
   197                'key': encoding.fromlocal(key),
       
   198                'old': encoding.fromlocal(old),
       
   199                'new': encoding.fromlocal(new)}, f
       
   200         d = f.value
       
   201         d, output = d.split('\n', 1)
       
   202         try:
       
   203             d = bool(int(d))
       
   204         except ValueError:
       
   205             raise error.ResponseError(
       
   206                 _('push failed (unexpected response):'), d)
       
   207         for l in output.splitlines(True):
       
   208             self.ui.status(_('remote: '), l)
       
   209         yield d
       
   210 
       
   211     def stream_out(self):
       
   212         return self._callstream('stream_out')
       
   213 
       
   214     def getbundle(self, source, **kwargs):
       
   215         kwargs = pycompat.byteskwargs(kwargs)
       
   216         self.requirecap('getbundle', _('look up remote changes'))
       
   217         opts = {}
       
   218         bundlecaps = kwargs.get('bundlecaps') or set()
       
   219         for key, value in kwargs.iteritems():
       
   220             if value is None:
       
   221                 continue
       
   222             keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
       
   223             if keytype is None:
       
   224                 raise error.ProgrammingError(
       
   225                     'Unexpectedly None keytype for key %s' % key)
       
   226             elif keytype == 'nodes':
       
   227                 value = wireprototypes.encodelist(value)
       
   228             elif keytype == 'csv':
       
   229                 value = ','.join(value)
       
   230             elif keytype == 'scsv':
       
   231                 value = ','.join(sorted(value))
       
   232             elif keytype == 'boolean':
       
   233                 value = '%i' % bool(value)
       
   234             elif keytype != 'plain':
       
   235                 raise KeyError('unknown getbundle option type %s'
       
   236                                % keytype)
       
   237             opts[key] = value
       
   238         f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
       
   239         if any((cap.startswith('HG2') for cap in bundlecaps)):
       
   240             return bundle2.getunbundler(self.ui, f)
       
   241         else:
       
   242             return changegroupmod.cg1unpacker(f, 'UN')
       
   243 
       
   244     def unbundle(self, cg, heads, url):
       
   245         '''Send cg (a readable file-like object representing the
       
   246         changegroup to push, typically a chunkbuffer object) to the
       
   247         remote server as a bundle.
       
   248 
       
   249         When pushing a bundle10 stream, return an integer indicating the
       
   250         result of the push (see changegroup.apply()).
       
   251 
       
   252         When pushing a bundle20 stream, return a bundle20 stream.
       
   253 
       
   254         `url` is the url the client thinks it's pushing to, which is
       
   255         visible to hooks.
       
   256         '''
       
   257 
       
   258         if heads != ['force'] and self.capable('unbundlehash'):
       
   259             heads = wireprototypes.encodelist(
       
   260                 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
       
   261         else:
       
   262             heads = wireprototypes.encodelist(heads)
       
   263 
       
   264         if util.safehasattr(cg, 'deltaheader'):
       
   265             # this a bundle10, do the old style call sequence
       
   266             ret, output = self._callpush("unbundle", cg, heads=heads)
       
   267             if ret == "":
       
   268                 raise error.ResponseError(
       
   269                     _('push failed:'), output)
       
   270             try:
       
   271                 ret = int(ret)
       
   272             except ValueError:
       
   273                 raise error.ResponseError(
       
   274                     _('push failed (unexpected response):'), ret)
       
   275 
       
   276             for l in output.splitlines(True):
       
   277                 self.ui.status(_('remote: '), l)
       
   278         else:
       
   279             # bundle2 push. Send a stream, fetch a stream.
       
   280             stream = self._calltwowaystream('unbundle', cg, heads=heads)
       
   281             ret = bundle2.getunbundler(self.ui, stream)
       
   282         return ret
       
   283 
       
   284     # End of ipeercommands interface.
       
   285 
       
   286     # Begin of ipeerlegacycommands interface.
       
   287 
       
   288     def branches(self, nodes):
       
   289         n = wireprototypes.encodelist(nodes)
       
   290         d = self._call("branches", nodes=n)
       
   291         try:
       
   292             br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
       
   293             return br
       
   294         except ValueError:
       
   295             self._abort(error.ResponseError(_("unexpected response:"), d))
       
   296 
       
   297     def between(self, pairs):
       
   298         batch = 8 # avoid giant requests
       
   299         r = []
       
   300         for i in xrange(0, len(pairs), batch):
       
   301             n = " ".join([wireprototypes.encodelist(p, '-')
       
   302                           for p in pairs[i:i + batch]])
       
   303             d = self._call("between", pairs=n)
       
   304             try:
       
   305                 r.extend(l and wireprototypes.decodelist(l) or []
       
   306                          for l in d.splitlines())
       
   307             except ValueError:
       
   308                 self._abort(error.ResponseError(_("unexpected response:"), d))
       
   309         return r
       
   310 
       
   311     def changegroup(self, nodes, kind):
       
   312         n = wireprototypes.encodelist(nodes)
       
   313         f = self._callcompressable("changegroup", roots=n)
       
   314         return changegroupmod.cg1unpacker(f, 'UN')
       
   315 
       
   316     def changegroupsubset(self, bases, heads, kind):
       
   317         self.requirecap('changegroupsubset', _('look up remote changes'))
       
   318         bases = wireprototypes.encodelist(bases)
       
   319         heads = wireprototypes.encodelist(heads)
       
   320         f = self._callcompressable("changegroupsubset",
       
   321                                    bases=bases, heads=heads)
       
   322         return changegroupmod.cg1unpacker(f, 'UN')
       
   323 
       
   324     # End of ipeerlegacycommands interface.
       
   325 
       
   326     def _submitbatch(self, req):
       
   327         """run batch request <req> on the server
       
   328 
       
   329         Returns an iterator of the raw responses from the server.
       
   330         """
       
   331         ui = self.ui
       
   332         if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
       
   333             ui.debug('devel-peer-request: batched-content\n')
       
   334             for op, args in req:
       
   335                 msg = 'devel-peer-request:    - %s (%d arguments)\n'
       
   336                 ui.debug(msg % (op, len(args)))
       
   337 
       
   338         unescapearg = wireprototypes.unescapebatcharg
       
   339 
       
   340         rsp = self._callstream("batch", cmds=encodebatchcmds(req))
       
   341         chunk = rsp.read(1024)
       
   342         work = [chunk]
       
   343         while chunk:
       
   344             while ';' not in chunk and chunk:
       
   345                 chunk = rsp.read(1024)
       
   346                 work.append(chunk)
       
   347             merged = ''.join(work)
       
   348             while ';' in merged:
       
   349                 one, merged = merged.split(';', 1)
       
   350                 yield unescapearg(one)
       
   351             chunk = rsp.read(1024)
       
   352             work = [merged, chunk]
       
   353         yield unescapearg(''.join(work))
       
   354 
       
   355     def _submitone(self, op, args):
       
   356         return self._call(op, **pycompat.strkwargs(args))
       
   357 
       
   358     def debugwireargs(self, one, two, three=None, four=None, five=None):
       
   359         # don't pass optional arguments left at their default value
       
   360         opts = {}
       
   361         if three is not None:
       
   362             opts[r'three'] = three
       
   363         if four is not None:
       
   364             opts[r'four'] = four
       
   365         return self._call('debugwireargs', one=one, two=two, **opts)
       
   366 
       
   367     def _call(self, cmd, **args):
       
   368         """execute <cmd> on the server
       
   369 
       
   370         The command is expected to return a simple string.
       
   371 
       
   372         returns the server reply as a string."""
       
   373         raise NotImplementedError()
       
   374 
       
   375     def _callstream(self, cmd, **args):
       
   376         """execute <cmd> on the server
       
   377 
       
   378         The command is expected to return a stream. Note that if the
       
   379         command doesn't return a stream, _callstream behaves
       
   380         differently for ssh and http peers.
       
   381 
       
   382         returns the server reply as a file like object.
       
   383         """
       
   384         raise NotImplementedError()
       
   385 
       
   386     def _callcompressable(self, cmd, **args):
       
   387         """execute <cmd> on the server
       
   388 
       
   389         The command is expected to return a stream.
       
   390 
       
   391         The stream may have been compressed in some implementations. This
       
   392         function takes care of the decompression. This is the only difference
       
   393         with _callstream.
       
   394 
       
   395         returns the server reply as a file like object.
       
   396         """
       
   397         raise NotImplementedError()
       
   398 
       
   399     def _callpush(self, cmd, fp, **args):
       
   400         """execute a <cmd> on server
       
   401 
       
   402         The command is expected to be related to a push. Push has a special
       
   403         return method.
       
   404 
       
   405         returns the server reply as a (ret, output) tuple. ret is either
       
   406         empty (error) or a stringified int.
       
   407         """
       
   408         raise NotImplementedError()
       
   409 
       
   410     def _calltwowaystream(self, cmd, fp, **args):
       
   411         """execute <cmd> on server
       
   412 
       
   413         The command will send a stream to the server and get a stream in reply.
       
   414         """
       
   415         raise NotImplementedError()
       
   416 
       
   417     def _abort(self, exception):
       
   418         """clearly abort the wire protocol connection and raise the exception
       
   419         """
       
   420         raise NotImplementedError()