view mercurial/wireprotov1peer.py @ 45972:8b99c473aae2

copies-rust: move is_ancestor caching within the rust code Now that the OrdMap merging is fast, smaller things start to matters. We move the caching of `is_ancestor` call within the Rust code. This avoid round-trip to Python and help us to shave more time on our slower case: Repo Cases Source-Rev Dest-Rev Old-Time New-Time Difference Factor ------------------------------------------------------------------------------------------------------------------------------------ pypy x0000_revs_x_added_0_copies d1defd0dc478 c9cb1334cc78 : 2.780174 s, 2.137894 s, -0.642280 s, × 0.7690 mozilla-try x0000_revs_xx000_added_x000_copies 89294cd501d9 7ccb2fc7ccb5 : 9.843481 s, 8.100385 s, -1.743096 s, × 0.8229 Note: I would happily have used native code for ancestors computation, however I failed (did not tried hard) to created a rust version that goes as fast as the current C version. Below are full tables for: - this change compared to the previous change - this change compared to filelog performance Repo Cases Source-Rev Dest-Rev Old-Time New-Time Difference Factor ------------------------------------------------------------------------------------------------------------------------------------ mercurial x_revs_x_added_0_copies ad6b123de1c7 39cfcef4f463 : 0.000049 s, 0.000047 s, -0.000002 s, × 0.9592 mercurial x_revs_x_added_x_copies 2b1c78674230 0c1d10351869 : 0.000182 s, 0.000181 s, -0.000001 s, × 0.9945 mercurial x000_revs_x000_added_x_copies 81f8ff2a9bf2 dd3267698d84 : 0.005872 s, 0.005852 s, -0.000020 s, × 0.9966 pypy x_revs_x_added_0_copies aed021ee8ae8 099ed31b181b : 0.000229 s, 0.000229 s, +0.000000 s, × 1.0000 pypy x_revs_x000_added_0_copies 4aa4e1f8e19a 359343b9ac0e : 0.000058 s, 0.000058 s, +0.000000 s, × 1.0000 pypy x_revs_x_added_x_copies ac52eb7bbbb0 72e022663155 : 0.000148 s, 0.000146 s, -0.000002 s, × 0.9865 pypy x_revs_x00_added_x_copies c3b14617fbd7 ace7255d9a26 : 0.001205 s, 0.001206 s, +0.000001 s, × 1.0008 pypy x_revs_x000_added_x000_copies df6f7a526b60 a83dc6a2d56f : 0.025662 s, 0.025275 s, -0.000387 s, × 0.9849 pypy x000_revs_xx00_added_0_copies 89a76aede314 2f22446ff07e : 0.080113 s, 0.080303 s, +0.000190 s, × 1.0024 pypy x000_revs_x000_added_x_copies 8a3b5bfd266e 2c68e87c3efe : 0.153030 s, 0.152641 s, -0.000389 s, × 0.9975 pypy x000_revs_x000_added_x000_copies 89a76aede314 7b3dda341c84 : 0.098774 s, 0.099107 s, +0.000333 s, × 1.0034 pypy x0000_revs_x_added_0_copies d1defd0dc478 c9cb1334cc78 : 2.780174 s, 2.137894 s, -0.642280 s, × 0.7690 pypy x0000_revs_xx000_added_0_copies bf2c629d0071 4ffed77c095c : 0.022218 s, 0.022202 s, -0.000016 s, × 0.9993 pypy x0000_revs_xx000_added_x000_copies 08ea3258278e d9fa043f30c0 : 0.252125 s, 0.228946 s, -0.023179 s, × 0.9081 netbeans x_revs_x_added_0_copies fb0955ffcbcd a01e9239f9e7 : 0.000186 s, 0.000186 s, +0.000000 s, × 1.0000 netbeans x_revs_x000_added_0_copies 6f360122949f 20eb231cc7d0 : 0.000133 s, 0.000133 s, +0.000000 s, × 1.0000 netbeans x_revs_x_added_x_copies 1ada3faf6fb6 5a39d12eecf4 : 0.000320 s, 0.000320 s, +0.000000 s, × 1.0000 netbeans x_revs_x00_added_x_copies 35be93ba1e2c 9eec5e90c05f : 0.001336 s, 0.001339 s, +0.000003 s, × 1.0022 netbeans x000_revs_xx00_added_0_copies eac3045b4fdd 51d4ae7f1290 : 0.015573 s, 0.015694 s, +0.000121 s, × 1.0078 netbeans x000_revs_x000_added_x_copies e2063d266acd 6081d72689dc : 0.018667 s, 0.018457 s, -0.000210 s, × 0.9888 netbeans x000_revs_x000_added_x000_copies ff453e9fee32 411350406ec2 : 0.112534 s, 0.111691 s, -0.000843 s, × 0.9925 netbeans x0000_revs_xx000_added_x000_copies 588c2d1ced70 1aad62e59ddd : 1.231869 s, 1.166017 s, -0.065852 s, × 0.9465 mozilla-central x_revs_x_added_0_copies 3697f962bb7b 7015fcdd43a2 : 0.000197 s, 0.000197 s, +0.000000 s, × 1.0000 mozilla-central x_revs_x000_added_0_copies dd390860c6c9 40d0c5bed75d : 0.000637 s, 0.000626 s, -0.000011 s, × 0.9827 mozilla-central x_revs_x_added_x_copies 8d198483ae3b 14207ffc2b2f : 0.000303 s, 0.000303 s, +0.000000 s, × 1.0000 mozilla-central x_revs_x00_added_x_copies 98cbc58cc6bc 446a150332c3 : 0.001663 s, 0.001679 s, +0.000016 s, × 1.0096 mozilla-central x_revs_x000_added_x000_copies 3c684b4b8f68 0a5e72d1b479 : 0.007008 s, 0.006947 s, -0.000061 s, × 0.9913 mozilla-central x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 0.127385 s, 0.133070 s, +0.005685 s, × 1.0446 mozilla-central x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 0.008740 s, 0.008705 s, -0.000035 s, × 0.9960 mozilla-central x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 0.005783 s, 0.005913 s, +0.000130 s, × 1.0225 mozilla-central x000_revs_x000_added_x000_copies 7c97034feb78 4407bd0c6330 : 0.102184 s, 0.101373 s, -0.000811 s, × 0.9921 mozilla-central x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 0.046220 s, 0.046526 s, +0.000306 s, × 1.0066 mozilla-central x0000_revs_xx000_added_x000_copies f78c615a656c 96a38b690156 : 0.315271 s, 0.313954 s, -0.001317 s, × 0.9958 mozilla-central x00000_revs_x0000_added_x0000_copies 6832ae71433c 4c222a1d9a00 : 3.478747 s, 3.367395 s, -0.111352 s, × 0.9680 mozilla-central x00000_revs_x00000_added_x000_copies 76caed42cf7c 1daa622bbe42 : 4.766435 s, 4.691820 s, -0.074615 s, × 0.9843 mozilla-try x_revs_x_added_0_copies aaf6dde0deb8 9790f499805a : 0.001214 s, 0.001199 s, -0.000015 s, × 0.9876 mozilla-try x_revs_x000_added_0_copies d8d0222927b4 5bb8ce8c7450 : 0.001221 s, 0.001216 s, -0.000005 s, × 0.9959 mozilla-try x_revs_x_added_x_copies 092fcca11bdb 936255a0384a : 0.000613 s, 0.000613 s, +0.000000 s, × 1.0000 mozilla-try x_revs_x00_added_x_copies b53d2fadbdb5 017afae788ec : 0.001904 s, 0.001906 s, +0.000002 s, × 1.0011 mozilla-try x_revs_x000_added_x000_copies 20408ad61ce5 6f0ee96e21ad : 0.093000 s, 0.092766 s, -0.000234 s, × 0.9975 mozilla-try x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 0.132194 s, 0.136074 s, +0.003880 s, × 1.0294 mozilla-try x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 0.009069 s, 0.009067 s, -0.000002 s, × 0.9998 mozilla-try x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 0.006169 s, 0.006243 s, +0.000074 s, × 1.0120 mozilla-try x000_revs_x000_added_x000_copies 1346fd0130e4 4c65cbdabc1f : 0.115540 s, 0.114463 s, -0.001077 s, × 0.9907 mozilla-try x0000_revs_x_added_0_copies 63519bfd42ee a36a2a865d92 : 0.435381 s, 0.433683 s, -0.001698 s, × 0.9961 mozilla-try x0000_revs_x_added_x_copies 9fe69ff0762d bcabf2a78927 : 0.415461 s, 0.411278 s, -0.004183 s, × 0.9899 mozilla-try x0000_revs_xx000_added_x_copies 156f6e2674f2 4d0f2c178e66 : 0.155946 s, 0.155133 s, -0.000813 s, × 0.9948 mozilla-try x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 0.048521 s, 0.048933 s, +0.000412 s, × 1.0085 mozilla-try x0000_revs_xx000_added_x000_copies 89294cd501d9 7ccb2fc7ccb5 : 9.843481 s, 8.100385 s, -1.743096 s, × 0.8229 mozilla-try x0000_revs_x0000_added_x0000_copies e928c65095ed e951f4ad123a : 1.465128 s, 1.446720 s, -0.018408 s, × 0.9874 mozilla-try x00000_revs_x00000_added_0_copies dc8a3ca7010e d16fde900c9c : 1.374283 s, 1.369537 s, -0.004746 s, × 0.9965 mozilla-try x00000_revs_x0000_added_x0000_copies 8d3fafa80d4b eb884023b810 : 5.255158 s, 5.186079 s, -0.069079 s, × 0.9869 Repo Case Source-Rev Dest-Rev filelog sidedata Difference Factor -------------------------------------------------------------------------------------------------------------------------------------- mercurial x_revs_x_added_0_copies ad6b123de1c7 39cfcef4f463 : 0.000892 s, 0.000047 s, -0.000845 s, × 0.052691 mercurial x_revs_x_added_x_copies 2b1c78674230 0c1d10351869 : 0.001823 s, 0.000181 s, -0.001642 s, × 0.099287 mercurial x000_revs_x000_added_x_copies 81f8ff2a9bf2 dd3267698d84 : 0.018063 s, 0.005852 s, -0.012211 s, × 0.323977 pypy x_revs_x_added_0_copies aed021ee8ae8 099ed31b181b : 0.001505 s, 0.000229 s, -0.001276 s, × 0.152159 pypy x_revs_x000_added_0_copies 4aa4e1f8e19a 359343b9ac0e : 0.205895 s, 0.000058 s, -0.205837 s, × 0.000282 pypy x_revs_x_added_x_copies ac52eb7bbbb0 72e022663155 : 0.017021 s, 0.000146 s, -0.016875 s, × 0.008578 pypy x_revs_x00_added_x_copies c3b14617fbd7 ace7255d9a26 : 0.019422 s, 0.001206 s, -0.018216 s, × 0.062095 pypy x_revs_x000_added_x000_copies df6f7a526b60 a83dc6a2d56f : 0.767740 s, 0.025275 s, -0.742465 s, × 0.032921 pypy x000_revs_xx00_added_0_copies 89a76aede314 2f22446ff07e : 1.188515 s, 0.080303 s, -1.108212 s, × 0.067566 pypy x000_revs_x000_added_x_copies 8a3b5bfd266e 2c68e87c3efe : 1.251968 s, 0.152641 s, -1.099327 s, × 0.121921 pypy x000_revs_x000_added_x000_copies 89a76aede314 7b3dda341c84 : 1.616799 s, 0.099107 s, -1.517692 s, × 0.061298 pypy x0000_revs_x_added_0_copies d1defd0dc478 c9cb1334cc78 : 0.001057 s, 2.137894 s, +2.136837 s, × 2022.605487 pypy x0000_revs_xx000_added_0_copies bf2c629d0071 4ffed77c095c : 1.069485 s, 0.022202 s, -1.047283 s, × 0.020760 pypy x0000_revs_xx000_added_x000_copies 08ea3258278e d9fa043f30c0 : 1.350162 s, 0.228946 s, -1.121216 s, × 0.169569 netbeans x_revs_x_added_0_copies fb0955ffcbcd a01e9239f9e7 : 0.028008 s, 0.000186 s, -0.027822 s, × 0.006641 netbeans x_revs_x000_added_0_copies 6f360122949f 20eb231cc7d0 : 0.132281 s, 0.000133 s, -0.132148 s, × 0.001005 netbeans x_revs_x_added_x_copies 1ada3faf6fb6 5a39d12eecf4 : 0.025311 s, 0.000320 s, -0.024991 s, × 0.012643 netbeans x_revs_x00_added_x_copies 35be93ba1e2c 9eec5e90c05f : 0.052957 s, 0.001339 s, -0.051618 s, × 0.025285 netbeans x000_revs_xx00_added_0_copies eac3045b4fdd 51d4ae7f1290 : 0.038011 s, 0.015694 s, -0.022317 s, × 0.412880 netbeans x000_revs_x000_added_x_copies e2063d266acd 6081d72689dc : 0.198639 s, 0.018457 s, -0.180182 s, × 0.092917 netbeans x000_revs_x000_added_x000_copies ff453e9fee32 411350406ec2 : 0.955713 s, 0.111691 s, -0.844022 s, × 0.116867 netbeans x0000_revs_xx000_added_x000_copies 588c2d1ced70 1aad62e59ddd : 3.838886 s, 1.166017 s, -2.672869 s, × 0.303738 mozilla-central x_revs_x_added_0_copies 3697f962bb7b 7015fcdd43a2 : 0.024548 s, 0.000197 s, -0.024351 s, × 0.008025 mozilla-central x_revs_x000_added_0_copies dd390860c6c9 40d0c5bed75d : 0.143394 s, 0.000626 s, -0.142768 s, × 0.004366 mozilla-central x_revs_x_added_x_copies 8d198483ae3b 14207ffc2b2f : 0.026046 s, 0.000303 s, -0.025743 s, × 0.011633 mozilla-central x_revs_x00_added_x_copies 98cbc58cc6bc 446a150332c3 : 0.085440 s, 0.001679 s, -0.083761 s, × 0.019651 mozilla-central x_revs_x000_added_x000_copies 3c684b4b8f68 0a5e72d1b479 : 0.195656 s, 0.006947 s, -0.188709 s, × 0.035506 mozilla-central x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 2.190874 s, 0.133070 s, -2.057804 s, × 0.060738 mozilla-central x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 0.090208 s, 0.008705 s, -0.081503 s, × 0.096499 mozilla-central x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 0.747367 s, 0.005913 s, -0.741454 s, × 0.007912 mozilla-central x000_revs_x000_added_x000_copies 7c97034feb78 4407bd0c6330 : 1.152863 s, 0.101373 s, -1.051490 s, × 0.087932 mozilla-central x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 6.598336 s, 0.046526 s, -6.551810 s, × 0.007051 mozilla-central x0000_revs_xx000_added_x000_copies f78c615a656c 96a38b690156 : 3.255015 s, 0.313954 s, -2.941061 s, × 0.096452 mozilla-central x00000_revs_x0000_added_x0000_copies 6832ae71433c 4c222a1d9a00 : 15.668041 s, 3.367395 s, -12.300646 s, × 0.214921 mozilla-central x00000_revs_x00000_added_x000_copies 76caed42cf7c 1daa622bbe42 : 20.439638 s, 4.691820 s, -15.747818 s, × 0.229545 mozilla-try x_revs_x_added_0_copies aaf6dde0deb8 9790f499805a : 0.080923 s, 0.001199 s, -0.079724 s, × 0.014817 mozilla-try x_revs_x000_added_0_copies d8d0222927b4 5bb8ce8c7450 : 0.498456 s, 0.001216 s, -0.497240 s, × 0.002440 mozilla-try x_revs_x_added_x_copies 092fcca11bdb 936255a0384a : 0.020798 s, 0.000613 s, -0.020185 s, × 0.029474 mozilla-try x_revs_x00_added_x_copies b53d2fadbdb5 017afae788ec : 0.226930 s, 0.001906 s, -0.225024 s, × 0.008399 mozilla-try x_revs_x000_added_x000_copies 20408ad61ce5 6f0ee96e21ad : 1.113005 s, 0.092766 s, -1.020239 s, × 0.083347 mozilla-try x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 2.230671 s, 0.136074 s, -2.094597 s, × 0.061001 mozilla-try x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 0.089672 s, 0.009067 s, -0.080605 s, × 0.101113 mozilla-try x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 0.740221 s, 0.006243 s, -0.733978 s, × 0.008434 mozilla-try x000_revs_x000_added_x000_copies 1346fd0130e4 4c65cbdabc1f : 1.185881 s, 0.114463 s, -1.071418 s, × 0.096521 mozilla-try x0000_revs_x_added_0_copies 63519bfd42ee a36a2a865d92 : 0.086072 s, 0.433683 s, +0.347611 s, × 5.038607 mozilla-try x0000_revs_x_added_x_copies 9fe69ff0762d bcabf2a78927 : 0.081321 s, 0.411278 s, +0.329957 s, × 5.057464 mozilla-try x0000_revs_xx000_added_x_copies 156f6e2674f2 4d0f2c178e66 : 7.528370 s, 0.155133 s, -7.373237 s, × 0.020606 mozilla-try x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 6.757368 s, 0.048933 s, -6.708435 s, × 0.007241 mozilla-try x0000_revs_xx000_added_x000_copies 89294cd501d9 7ccb2fc7ccb5 : 7.643752 s, 8.100385 s, +0.456633 s, × 1.059739 mozilla-try x0000_revs_x0000_added_x0000_copies e928c65095ed e951f4ad123a : 9.704242 s, 1.446720 s, -8.257522 s, × 0.149081 mozilla-try x00000_revs_x_added_0_copies 6a320851d377 1ebb79acd503 : 0.092845 s, killed mozilla-try x00000_revs_x00000_added_0_copies dc8a3ca7010e d16fde900c9c : 26.626870 s, 1.369537 s, -25.257333 s, × 0.051434 mozilla-try x00000_revs_x_added_x_copies 5173c4b6f97c 95d83ee7242d : 0.092953 s, killed mozilla-try x00000_revs_x000_added_x_copies 9126823d0e9c ca82787bb23c : 0.227131 s, killed mozilla-try x00000_revs_x0000_added_x0000_copies 8d3fafa80d4b eb884023b810 : 18.884666 s, 5.186079 s, -13.698587 s, × 0.274619 mozilla-try x00000_revs_x00000_added_x0000_copies 1b661134e2ca 1ae03d022d6d : 21.451622 s, killed mozilla-try x00000_revs_x00000_added_x000_copies 9b2a99adc05e 8e29777b48e6 : 25.152558 s, killed Differential Revision: https://phab.mercurial-scm.org/D9303
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Fri, 02 Oct 2020 15:41:31 +0200
parents 89a2afe31e82
children 05dd091dfa6a
line wrap: on
line source

# wireprotov1peer.py - Client-side functionality for wire protocol version 1.
#
# Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

import sys
import weakref

from .i18n import _
from .node import bin
from .pycompat import (
    getattr,
    setattr,
)
from . import (
    bundle2,
    changegroup as changegroupmod,
    encoding,
    error,
    pushkey as pushkeymod,
    pycompat,
    util,
    wireprototypes,
)
from .interfaces import (
    repository,
    util as interfaceutil,
)
from .utils import hashutil

urlreq = util.urlreq


def batchable(f):
    """annotation for batchable methods

    Such methods must implement a coroutine as follows:

    @batchable
    def sample(self, one, two=None):
        # Build list of encoded arguments suitable for your wire protocol:
        encargs = [('one', encode(one),), ('two', encode(two),)]
        # Create future for injection of encoded result:
        encresref = future()
        # Return encoded arguments and future:
        yield encargs, encresref
        # Assuming the future to be filled with the result from the batched
        # request now. Decode it:
        yield decode(encresref.value)

    The decorator returns a function which wraps this coroutine as a plain
    method, but adds the original method as an attribute called "batchable",
    which is used by remotebatch to split the call into separate encoding and
    decoding phases.
    """

    def plain(*args, **opts):
        batchable = f(*args, **opts)
        encargsorres, encresref = next(batchable)
        if not encresref:
            return encargsorres  # a local result in this case
        self = args[0]
        cmd = pycompat.bytesurl(f.__name__)  # ensure cmd is ascii bytestr
        encresref.set(self._submitone(cmd, encargsorres))
        return next(batchable)

    setattr(plain, 'batchable', f)
    setattr(plain, '__name__', f.__name__)
    return plain


class future(object):
    '''placeholder for a value to be set later'''

    def set(self, value):
        if util.safehasattr(self, b'value'):
            raise error.RepoError(b"future is already set")
        self.value = value


def encodebatchcmds(req):
    """Return a ``cmds`` argument value for the ``batch`` command."""
    escapearg = wireprototypes.escapebatcharg

    cmds = []
    for op, argsdict in req:
        # Old servers didn't properly unescape argument names. So prevent
        # the sending of argument names that may not be decoded properly by
        # servers.
        assert all(escapearg(k) == k for k in argsdict)

        args = b','.join(
            b'%s=%s' % (escapearg(k), escapearg(v))
            for k, v in pycompat.iteritems(argsdict)
        )
        cmds.append(b'%s %s' % (op, args))

    return b';'.join(cmds)


class unsentfuture(pycompat.futures.Future):
    """A Future variation to represent an unsent command.

    Because we buffer commands and don't submit them immediately, calling
    ``result()`` on an unsent future could deadlock. Futures for buffered
    commands are represented by this type, which wraps ``result()`` to
    call ``sendcommands()``.
    """

    def result(self, timeout=None):
        if self.done():
            return pycompat.futures.Future.result(self, timeout)

        self._peerexecutor.sendcommands()

        # This looks like it will infinitely recurse. However,
        # sendcommands() should modify __class__. This call serves as a check
        # on that.
        return self.result(timeout)


@interfaceutil.implementer(repository.ipeercommandexecutor)
class peerexecutor(object):
    def __init__(self, peer):
        self._peer = peer
        self._sent = False
        self._closed = False
        self._calls = []
        self._futures = weakref.WeakSet()
        self._responseexecutor = None
        self._responsef = None

    def __enter__(self):
        return self

    def __exit__(self, exctype, excvalee, exctb):
        self.close()

    def callcommand(self, command, args):
        if self._sent:
            raise error.ProgrammingError(
                b'callcommand() cannot be used after commands are sent'
            )

        if self._closed:
            raise error.ProgrammingError(
                b'callcommand() cannot be used after close()'
            )

        # Commands are dispatched through methods on the peer.
        fn = getattr(self._peer, pycompat.sysstr(command), None)

        if not fn:
            raise error.ProgrammingError(
                b'cannot call command %s: method of same name not available '
                b'on peer' % command
            )

        # Commands are either batchable or they aren't. If a command
        # isn't batchable, we send it immediately because the executor
        # can no longer accept new commands after a non-batchable command.
        # If a command is batchable, we queue it for later. But we have
        # to account for the case of a non-batchable command arriving after
        # a batchable one and refuse to service it.

        def addcall():
            f = pycompat.futures.Future()
            self._futures.add(f)
            self._calls.append((command, args, fn, f))
            return f

        if getattr(fn, 'batchable', False):
            f = addcall()

            # But since we don't issue it immediately, we wrap its result()
            # to trigger sending so we avoid deadlocks.
            f.__class__ = unsentfuture
            f._peerexecutor = self
        else:
            if self._calls:
                raise error.ProgrammingError(
                    b'%s is not batchable and cannot be called on a command '
                    b'executor along with other commands' % command
                )

            f = addcall()

            # Non-batchable commands can never coexist with another command
            # in this executor. So send the command immediately.
            self.sendcommands()

        return f

    def sendcommands(self):
        if self._sent:
            return

        if not self._calls:
            return

        self._sent = True

        # Unhack any future types so caller seens a clean type and to break
        # cycle between us and futures.
        for f in self._futures:
            if isinstance(f, unsentfuture):
                f.__class__ = pycompat.futures.Future
                f._peerexecutor = None

        calls = self._calls
        # Mainly to destroy references to futures.
        self._calls = None

        # Simple case of a single command. We call it synchronously.
        if len(calls) == 1:
            command, args, fn, f = calls[0]

            # Future was cancelled. Ignore it.
            if not f.set_running_or_notify_cancel():
                return

            try:
                result = fn(**pycompat.strkwargs(args))
            except Exception:
                pycompat.future_set_exception_info(f, sys.exc_info()[1:])
            else:
                f.set_result(result)

            return

        # Batch commands are a bit harder. First, we have to deal with the
        # @batchable coroutine. That's a bit annoying. Furthermore, we also
        # need to preserve streaming. i.e. it should be possible for the
        # futures to resolve as data is coming in off the wire without having
        # to wait for the final byte of the final response. We do this by
        # spinning up a thread to read the responses.

        requests = []
        states = []

        for command, args, fn, f in calls:
            # Future was cancelled. Ignore it.
            if not f.set_running_or_notify_cancel():
                continue

            try:
                batchable = fn.batchable(
                    fn.__self__, **pycompat.strkwargs(args)
                )
            except Exception:
                pycompat.future_set_exception_info(f, sys.exc_info()[1:])
                return

            # Encoded arguments and future holding remote result.
            try:
                encargsorres, fremote = next(batchable)
            except Exception:
                pycompat.future_set_exception_info(f, sys.exc_info()[1:])
                return

            if not fremote:
                f.set_result(encargsorres)
            else:
                requests.append((command, encargsorres))
                states.append((command, f, batchable, fremote))

        if not requests:
            return

        # This will emit responses in order they were executed.
        wireresults = self._peer._submitbatch(requests)

        # The use of a thread pool executor here is a bit weird for something
        # that only spins up a single thread. However, thread management is
        # hard and it is easy to encounter race conditions, deadlocks, etc.
        # concurrent.futures already solves these problems and its thread pool
        # executor has minimal overhead. So we use it.
        self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
        self._responsef = self._responseexecutor.submit(
            self._readbatchresponse, states, wireresults
        )

    def close(self):
        self.sendcommands()

        if self._closed:
            return

        self._closed = True

        if not self._responsef:
            return

        # We need to wait on our in-flight response and then shut down the
        # executor once we have a result.
        try:
            self._responsef.result()
        finally:
            self._responseexecutor.shutdown(wait=True)
            self._responsef = None
            self._responseexecutor = None

            # If any of our futures are still in progress, mark them as
            # errored. Otherwise a result() could wait indefinitely.
            for f in self._futures:
                if not f.done():
                    f.set_exception(
                        error.ResponseError(
                            _(b'unfulfilled batch command response')
                        )
                    )

            self._futures = None

    def _readbatchresponse(self, states, wireresults):
        # Executes in a thread to read data off the wire.

        for command, f, batchable, fremote in states:
            # Grab raw result off the wire and teach the internal future
            # about it.
            remoteresult = next(wireresults)
            fremote.set(remoteresult)

            # And ask the coroutine to decode that value.
            try:
                result = next(batchable)
            except Exception:
                pycompat.future_set_exception_info(f, sys.exc_info()[1:])
            else:
                f.set_result(result)


@interfaceutil.implementer(
    repository.ipeercommands, repository.ipeerlegacycommands
)
class wirepeer(repository.peer):
    """Client-side interface for communicating with a peer repository.

    Methods commonly call wire protocol commands of the same name.

    See also httppeer.py and sshpeer.py for protocol-specific
    implementations of this interface.
    """

    def commandexecutor(self):
        return peerexecutor(self)

    # Begin of ipeercommands interface.

    def clonebundles(self):
        self.requirecap(b'clonebundles', _(b'clone bundles'))
        return self._call(b'clonebundles')

    @batchable
    def lookup(self, key):
        self.requirecap(b'lookup', _(b'look up remote revision'))
        f = future()
        yield {b'key': encoding.fromlocal(key)}, f
        d = f.value
        success, data = d[:-1].split(b" ", 1)
        if int(success):
            yield bin(data)
        else:
            self._abort(error.RepoError(data))

    @batchable
    def heads(self):
        f = future()
        yield {}, f
        d = f.value
        try:
            yield wireprototypes.decodelist(d[:-1])
        except ValueError:
            self._abort(error.ResponseError(_(b"unexpected response:"), d))

    @batchable
    def known(self, nodes):
        f = future()
        yield {b'nodes': wireprototypes.encodelist(nodes)}, f
        d = f.value
        try:
            yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
        except ValueError:
            self._abort(error.ResponseError(_(b"unexpected response:"), d))

    @batchable
    def branchmap(self):
        f = future()
        yield {}, f
        d = f.value
        try:
            branchmap = {}
            for branchpart in d.splitlines():
                branchname, branchheads = branchpart.split(b' ', 1)
                branchname = encoding.tolocal(urlreq.unquote(branchname))
                branchheads = wireprototypes.decodelist(branchheads)
                branchmap[branchname] = branchheads
            yield branchmap
        except TypeError:
            self._abort(error.ResponseError(_(b"unexpected response:"), d))

    @batchable
    def listkeys(self, namespace):
        if not self.capable(b'pushkey'):
            yield {}, None
        f = future()
        self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
        yield {b'namespace': encoding.fromlocal(namespace)}, f
        d = f.value
        self.ui.debug(
            b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
        )
        yield pushkeymod.decodekeys(d)

    @batchable
    def pushkey(self, namespace, key, old, new):
        if not self.capable(b'pushkey'):
            yield False, None
        f = future()
        self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
        yield {
            b'namespace': encoding.fromlocal(namespace),
            b'key': encoding.fromlocal(key),
            b'old': encoding.fromlocal(old),
            b'new': encoding.fromlocal(new),
        }, f
        d = f.value
        d, output = d.split(b'\n', 1)
        try:
            d = bool(int(d))
        except ValueError:
            raise error.ResponseError(
                _(b'push failed (unexpected response):'), d
            )
        for l in output.splitlines(True):
            self.ui.status(_(b'remote: '), l)
        yield d

    def stream_out(self):
        return self._callstream(b'stream_out')

    def getbundle(self, source, **kwargs):
        kwargs = pycompat.byteskwargs(kwargs)
        self.requirecap(b'getbundle', _(b'look up remote changes'))
        opts = {}
        bundlecaps = kwargs.get(b'bundlecaps') or set()
        for key, value in pycompat.iteritems(kwargs):
            if value is None:
                continue
            keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
            if keytype is None:
                raise error.ProgrammingError(
                    b'Unexpectedly None keytype for key %s' % key
                )
            elif keytype == b'nodes':
                value = wireprototypes.encodelist(value)
            elif keytype == b'csv':
                value = b','.join(value)
            elif keytype == b'scsv':
                value = b','.join(sorted(value))
            elif keytype == b'boolean':
                value = b'%i' % bool(value)
            elif keytype != b'plain':
                raise KeyError(b'unknown getbundle option type %s' % keytype)
            opts[key] = value
        f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
        if any((cap.startswith(b'HG2') for cap in bundlecaps)):
            return bundle2.getunbundler(self.ui, f)
        else:
            return changegroupmod.cg1unpacker(f, b'UN')

    def unbundle(self, bundle, heads, url):
        """Send cg (a readable file-like object representing the
        changegroup to push, typically a chunkbuffer object) to the
        remote server as a bundle.

        When pushing a bundle10 stream, return an integer indicating the
        result of the push (see changegroup.apply()).

        When pushing a bundle20 stream, return a bundle20 stream.

        `url` is the url the client thinks it's pushing to, which is
        visible to hooks.
        """

        if heads != [b'force'] and self.capable(b'unbundlehash'):
            heads = wireprototypes.encodelist(
                [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
            )
        else:
            heads = wireprototypes.encodelist(heads)

        if util.safehasattr(bundle, b'deltaheader'):
            # this a bundle10, do the old style call sequence
            ret, output = self._callpush(b"unbundle", bundle, heads=heads)
            if ret == b"":
                raise error.ResponseError(_(b'push failed:'), output)
            try:
                ret = int(ret)
            except ValueError:
                raise error.ResponseError(
                    _(b'push failed (unexpected response):'), ret
                )

            for l in output.splitlines(True):
                self.ui.status(_(b'remote: '), l)
        else:
            # bundle2 push. Send a stream, fetch a stream.
            stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
            ret = bundle2.getunbundler(self.ui, stream)
        return ret

    # End of ipeercommands interface.

    # Begin of ipeerlegacycommands interface.

    def branches(self, nodes):
        n = wireprototypes.encodelist(nodes)
        d = self._call(b"branches", nodes=n)
        try:
            br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
            return br
        except ValueError:
            self._abort(error.ResponseError(_(b"unexpected response:"), d))

    def between(self, pairs):
        batch = 8  # avoid giant requests
        r = []
        for i in pycompat.xrange(0, len(pairs), batch):
            n = b" ".join(
                [
                    wireprototypes.encodelist(p, b'-')
                    for p in pairs[i : i + batch]
                ]
            )
            d = self._call(b"between", pairs=n)
            try:
                r.extend(
                    l and wireprototypes.decodelist(l) or []
                    for l in d.splitlines()
                )
            except ValueError:
                self._abort(error.ResponseError(_(b"unexpected response:"), d))
        return r

    def changegroup(self, nodes, source):
        n = wireprototypes.encodelist(nodes)
        f = self._callcompressable(b"changegroup", roots=n)
        return changegroupmod.cg1unpacker(f, b'UN')

    def changegroupsubset(self, bases, heads, source):
        self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
        bases = wireprototypes.encodelist(bases)
        heads = wireprototypes.encodelist(heads)
        f = self._callcompressable(
            b"changegroupsubset", bases=bases, heads=heads
        )
        return changegroupmod.cg1unpacker(f, b'UN')

    # End of ipeerlegacycommands interface.

    def _submitbatch(self, req):
        """run batch request <req> on the server

        Returns an iterator of the raw responses from the server.
        """
        ui = self.ui
        if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
            ui.debug(b'devel-peer-request: batched-content\n')
            for op, args in req:
                msg = b'devel-peer-request:    - %s (%d arguments)\n'
                ui.debug(msg % (op, len(args)))

        unescapearg = wireprototypes.unescapebatcharg

        rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
        chunk = rsp.read(1024)
        work = [chunk]
        while chunk:
            while b';' not in chunk and chunk:
                chunk = rsp.read(1024)
                work.append(chunk)
            merged = b''.join(work)
            while b';' in merged:
                one, merged = merged.split(b';', 1)
                yield unescapearg(one)
            chunk = rsp.read(1024)
            work = [merged, chunk]
        yield unescapearg(b''.join(work))

    def _submitone(self, op, args):
        return self._call(op, **pycompat.strkwargs(args))

    def debugwireargs(self, one, two, three=None, four=None, five=None):
        # don't pass optional arguments left at their default value
        opts = {}
        if three is not None:
            opts['three'] = three
        if four is not None:
            opts['four'] = four
        return self._call(b'debugwireargs', one=one, two=two, **opts)

    def _call(self, cmd, **args):
        """execute <cmd> on the server

        The command is expected to return a simple string.

        returns the server reply as a string."""
        raise NotImplementedError()

    def _callstream(self, cmd, **args):
        """execute <cmd> on the server

        The command is expected to return a stream. Note that if the
        command doesn't return a stream, _callstream behaves
        differently for ssh and http peers.

        returns the server reply as a file like object.
        """
        raise NotImplementedError()

    def _callcompressable(self, cmd, **args):
        """execute <cmd> on the server

        The command is expected to return a stream.

        The stream may have been compressed in some implementations. This
        function takes care of the decompression. This is the only difference
        with _callstream.

        returns the server reply as a file like object.
        """
        raise NotImplementedError()

    def _callpush(self, cmd, fp, **args):
        """execute a <cmd> on server

        The command is expected to be related to a push. Push has a special
        return method.

        returns the server reply as a (ret, output) tuple. ret is either
        empty (error) or a stringified int.
        """
        raise NotImplementedError()

    def _calltwowaystream(self, cmd, fp, **args):
        """execute <cmd> on server

        The command will send a stream to the server and get a stream in reply.
        """
        raise NotImplementedError()

    def _abort(self, exception):
        """clearly abort the wire protocol connection and raise the exception"""
        raise NotImplementedError()