view mercurial/commandserver.py @ 46582:b0a3ca02d17a

copies-rust: implement PartialEqual manually Now that we know that each (dest, rev) pair has at most a unique CopySource, we can simplify comparison a lot. This "simple" step buy a good share of the previous slowdown back in some case: Repo Case Source-Rev Dest-Rev # of revisions old time new time Difference Factor time per rev --------------------------------------------------------------------------------------------------------------------------------------------------------------- mozilla-try x00000_revs_x00000_added_x000_copies 9b2a99adc05e 8e29777b48e6 : 382065 revs, 43.304637 s, 34.443661 s, -8.860976 s, × 0.7954, 90 µs/rev Full benchmark: Repo Case Source-Rev Dest-Rev # of revisions old time new time Difference Factor time per rev --------------------------------------------------------------------------------------------------------------------------------------------------------------- mercurial x_revs_x_added_0_copies ad6b123de1c7 39cfcef4f463 : 1 revs, 0.000043 s, 0.000043 s, +0.000000 s, × 1.0000, 43 µs/rev mercurial x_revs_x_added_x_copies 2b1c78674230 0c1d10351869 : 6 revs, 0.000114 s, 0.000117 s, +0.000003 s, × 1.0263, 19 µs/rev mercurial x000_revs_x000_added_x_copies 81f8ff2a9bf2 dd3267698d84 : 1032 revs, 0.004937 s, 0.004892 s, -0.000045 s, × 0.9909, 4 µs/rev pypy x_revs_x_added_0_copies aed021ee8ae8 099ed31b181b : 9 revs, 0.000339 s, 0.000196 s, -0.000143 s, × 0.5782, 21 µs/rev pypy x_revs_x000_added_0_copies 4aa4e1f8e19a 359343b9ac0e : 1 revs, 0.000049 s, 0.000050 s, +0.000001 s, × 1.0204, 50 µs/rev pypy x_revs_x_added_x_copies ac52eb7bbbb0 72e022663155 : 7 revs, 0.000202 s, 0.000117 s, -0.000085 s, × 0.5792, 16 µs/rev pypy x_revs_x00_added_x_copies c3b14617fbd7 ace7255d9a26 : 1 revs, 0.000409 s, 0.6f1f4a s, -0.000087 s, × 0.7873, 322 µs/rev pypy x_revs_x000_added_x000_copies df6f7a526b60 a83dc6a2d56f : 6 revs, 0.011984 s, 0.011949 s, -0.000035 s, × 0.9971, 1991 µs/rev pypy x000_revs_xx00_added_0_copies 89a76aede314 2f22446ff07e : 4785 revs, 0.050820 s, 0.050802 s, -0.000018 s, × 0.9996, 10 µs/rev pypy x000_revs_x000_added_x_copies 8a3b5bfd266e 2c68e87c3efe : 6780 revs, 0.087953 s, 0.088090 s, +0.000137 s, × 1.0016, 12 µs/rev pypy x000_revs_x000_added_x000_copies 89a76aede314 7b3dda341c84 : 5441 revs, 0.062902 s, 0.062079 s, -0.000823 s, × 0.9869, 11 µs/rev pypy x0000_revs_x_added_0_copies d1defd0dc478 c9cb1334cc78 : 43645 revs, 0.679234 s, 0.635337 s, -0.043897 s, × 0.9354, 14 µs/rev pypy x0000_revs_xx000_added_0_copies bf2c629d0071 4ffed77c095c : 2 revs, 0.013095 s, 0.013262 s, +0.000167 s, × 1.0128, 6631 µs/rev pypy x0000_revs_xx000_added_x000_copies 08ea3258278e d9fa043f30c0 : 11316 revs, 0.120910 s, 0.120085 s, -0.000825 s, × 0.9932, 10 µs/rev netbeans x_revs_x_added_0_copies fb0955ffcbcd a01e9239f9e7 : 2 revs, 0.000087 s, 0.000085 s, -0.000002 s, × 0.9770, 42 µs/rev netbeans x_revs_x000_added_0_copies 6f360122949f 20eb231cc7d0 : 2 revs, 0.000107 s, 0.000110 s, +0.000003 s, × 1.0280, 55 µs/rev netbeans x_revs_x_added_x_copies 1ada3faf6fb6 5a39d12eecf4 : 3 revs, 0.000186 s, 0.000177 s, -0.000009 s, × 0.9516, 59 µs/rev netbeans x_revs_x00_added_x_copies 35be93ba1e2c 9eec5e90c05f : 9 revs, 0.000754 s, 0.000743 s, -0.000011 s, × 0.9854, 82 µs/rev netbeans x000_revs_xx00_added_0_copies eac3045b4fdd 51d4ae7f1290 : 1421 revs, 0.010443 s, 0.010168 s, -0.000275 s, × 0.9737, 7 µs/rev netbeans x000_revs_x000_added_x_copies e2063d266acd 6081d72689dc : 1533 revs, 0.015697 s, 0.015946 s, +0.000249 s, × 1.0159, 10 µs/rev netbeans x000_revs_x000_added_x000_copies ff453e9fee32 411350406ec2 : 5750 revs, 0.063528 s, 0.062712 s, -0.000816 s, × 0.9872, 10 µs/rev netbeans x0000_revs_xx000_added_x000_copies 588c2d1ced70 1aad62e59ddd : 66949 revs, 0.545515 s, 0.523832 s, -0.021683 s, × 0.9603, 7 µs/rev mozilla-central x_revs_x_added_0_copies 3697f962bb7b 7015fcdd43a2 : 2 revs, 0.000089 s, 0.000090 s, +0.000001 s, × 1.0112, 45 µs/rev mozilla-central x_revs_x000_added_0_copies dd390860c6c9 40d0c5bed75d : 8 revs, 0.000265 s, 0.000264 s, -0.000001 s, × 0.9962, 33 µs/rev mozilla-central x_revs_x_added_x_copies 8d198483ae3b 14207ffc2b2f : 9 revs, 0.000381 s, 0.000187 s, -0.000194 s, × 0.4908, 20 µs/rev mozilla-central x_revs_x00_added_x_copies 98cbc58cc6bc 446a150332c3 : 7 revs, 0.000672 s, 0.000665 s, -0.000007 s, × 0.9896, 95 µs/rev mozilla-central x_revs_x000_added_x000_copies 3c684b4b8f68 0a5e72d1b479 : 3 revs, 0.003497 s, 0.003556 s, +0.000059 s, × 1.0169, 1185 µs/rev mozilla-central x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 6 revs, 0.073204 s, 0.071345 s, -0.001859 s, × 0.9746, 11890 µs/rev mozilla-central x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 1593 revs, 0.006482 s, 0.006551 s, +0.000069 s, × 1.0106, 4 µs/rev mozilla-central x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 41 revs, 0.005066 s, 0.005078 s, +0.000012 s, × 1.0024, 123 µs/rev mozilla-central x000_revs_x000_added_x000_copies 7c97034feb78 4407bd0c6330 : 7839 revs, 0.065707 s, 0.065823 s, +0.000116 s, × 1.0018, 8 µs/rev mozilla-central x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 615 revs, 0.026800 s, 0.027050 s, +0.000250 s, × 1.0093, 43 µs/rev mozilla-central x0000_revs_xx000_added_x000_copies f78c615a656c 96a38b690156 : 30263 revs, 0.203856 s, 0.202443 s, -0.001413 s, × 0.9931, 6 µs/rev mozilla-central x00000_revs_x0000_added_x0000_copies 6832ae71433c 4c222a1d9a00 : 153721 revs, 1.293394 s, 1.261583 s, -0.031811 s, × 0.9754, 8 µs/rev mozilla-central x00000_revs_x00000_added_x000_copies 76caed42cf7c 1daa622bbe42 : 204976 revs, 1.698239 s, 1.643869 s, -0.054370 s, × 0.9680, 8 µs/rev mozilla-try x_revs_x_added_0_copies aaf6dde0deb8 9790f499805a : 2 revs, 0.000875 s, 0.000868 s, -0.000007 s, × 0.9920, 434 µs/rev mozilla-try x_revs_x000_added_0_copies d8d0222927b4 5bb8ce8c7450 : 2 revs, 0.000891 s, 0.000887 s, -0.000004 s, × 0.9955, 443 µs/rev mozilla-try x_revs_x_added_x_copies 092fcca11bdb 936255a0384a : 4 revs, 0.000292 s, 0.000168 s, -0.000124 s, × 0.5753, 42 µs/rev mozilla-try x_revs_x00_added_x_copies b53d2fadbdb5 017afae788ec : 2 revs, 0.003939 s, 0.001160 s, -0.002779 s, × 0.2945, 580 µs/rev mozilla-try x_revs_x000_added_x000_copies 20408ad61ce5 6f0ee96e21ad : 1 revs, 0.033027 s, 0.033016 s, -0.000011 s, × 0.9997, 33016 µs/rev mozilla-try x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 6 revs, 0.073703 s, 0.073312 s, -0.39ae31 s, × 0.9947, 12218 µs/rev mozilla-try x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 1593 revs, 0.006469 s, 0.006485 s, +0.000016 s, × 1.0025, 4 µs/rev mozilla-try x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 41 revs, 0.005278 s, 0.005494 s, +0.000216 s, × 1.0409, 134 µs/rev mozilla-try x000_revs_x000_added_x000_copies 1346fd0130e4 4c65cbdabc1f : 6657 revs, 0.064995 s, 0.064879 s, -0.000116 s, × 0.9982, 9 µs/rev mozilla-try x0000_revs_x_added_0_copies 63519bfd42ee a36a2a865d92 : 40314 revs, 0.301041 s, 0.301469 s, +0.000428 s, × 1.0014, 7 µs/rev mozilla-try x0000_revs_x_added_x_copies 9fe69ff0762d bcabf2a78927 : 38690 revs, 0.285575 s, 0.297113 s, +0.011538 s, × 1.0404, 7 µs/rev mozilla-try x0000_revs_xx000_added_x_copies 156f6e2674f2 4d0f2c178e66 : 8598 revs, 0.085597 s, 0.085890 s, +0.000293 s, × 1.0034, 9 µs/rev mozilla-try x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 615 revs, 0.027118 s, 0.027718 s, +0.000600 s, × 1.0221, 45 µs/rev mozilla-try x0000_revs_xx000_added_x000_copies 89294cd501d9 7ccb2fc7ccb5 : 97052 revs, 2.119204 s, 2.048949 s, -0.070255 s, × 0.9668, 21 µs/rev mozilla-try x0000_revs_x0000_added_x0000_copies e928c65095ed e951f4ad123a : 52031 revs, 0.701479 s, 0.685924 s, -0.015555 s, × 0.9778, 13 µs/rev mozilla-try x00000_revs_x_added_0_copies 6a320851d377 1ebb79acd503 : 363753 revs, 4.482399 s, 4.482891 s, +0.000492 s, × 1.0001, 12 µs/rev mozilla-try x00000_revs_x00000_added_0_copies dc8a3ca7010e d16fde900c9c : 34414 revs, 0.574082 s, 0.577633 s, +0.003551 s, × 1.0062, 16 µs/rev mozilla-try x00000_revs_x_added_x_copies 5173c4b6f97c 95d83ee7242d : 362229 revs, 4.480366 s, 4.397816 s, -0.082550 s, × 0.9816, 12 µs/rev mozilla-try x00000_revs_x000_added_x_copies 9126823d0e9c ca82787bb23c : 359344 revs, 4.369070 s, 4.370538 s, +0.001468 s, × 1.0003, 12 µs/rev mozilla-try x00000_revs_x0000_added_x0000_copies 8d3fafa80d4b eb884023b810 : 192665 revs, 1.592506 s, 1.570439 s, -0.022067 s, × 0.9861, 8 µs/rev mozilla-try x00000_revs_x00000_added_x0000_copies 1b661134e2ca 1ae03d022d6d : 228985 revs, 87.824489 s, 88.388512 s, +0.564023 s, × 1.0064, 386 µs/rev mozilla-try x00000_revs_x00000_added_x000_copies 9b2a99adc05e 8e29777b48e6 : 382065 revs, 43.304637 s, 34.443661 s, -8.860976 s, × 0.7954, 90 µs/rev private : 459513 revs, 33.853687 s, 27.370148 s, -6.483539 s, × 0.8085, 59 µs/rev Differential Revision: https://phab.mercurial-scm.org/D9653
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 16 Dec 2020 11:11:05 +0100
parents 49b6910217f9
children d4ba4d51f85f
line wrap: on
line source

# commandserver.py - communicate with Mercurial's API over a pipe
#
#  Copyright Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

import errno
import gc
import os
import random
import signal
import socket
import struct
import traceback

try:
    import selectors

    selectors.BaseSelector
except ImportError:
    from .thirdparty import selectors2 as selectors

from .i18n import _
from .pycompat import getattr
from . import (
    encoding,
    error,
    loggingutil,
    pycompat,
    repocache,
    util,
    vfs as vfsmod,
)
from .utils import (
    cborutil,
    procutil,
)


class channeledoutput(object):
    """
    Write data to out in the following format:

    data length (unsigned int),
    data
    """

    def __init__(self, out, channel):
        self.out = out
        self.channel = channel

    @property
    def name(self):
        return b'<%c-channel>' % self.channel

    def write(self, data):
        if not data:
            return
        # single write() to guarantee the same atomicity as the underlying file
        self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
        self.out.flush()

    def __getattr__(self, attr):
        if attr in ('isatty', 'fileno', 'tell', 'seek'):
            raise AttributeError(attr)
        return getattr(self.out, attr)


class channeledmessage(object):
    """
    Write encoded message and metadata to out in the following format:

    data length (unsigned int),
    encoded message and metadata, as a flat key-value dict.

    Each message should have 'type' attribute. Messages of unknown type
    should be ignored.
    """

    # teach ui that write() can take **opts
    structured = True

    def __init__(self, out, channel, encodename, encodefn):
        self._cout = channeledoutput(out, channel)
        self.encoding = encodename
        self._encodefn = encodefn

    def write(self, data, **opts):
        opts = pycompat.byteskwargs(opts)
        if data is not None:
            opts[b'data'] = data
        self._cout.write(self._encodefn(opts))

    def __getattr__(self, attr):
        return getattr(self._cout, attr)


class channeledinput(object):
    """
    Read data from in_.

    Requests for input are written to out in the following format:
    channel identifier - 'I' for plain input, 'L' line based (1 byte)
    how many bytes to send at most (unsigned int),

    The client replies with:
    data length (unsigned int), 0 meaning EOF
    data
    """

    maxchunksize = 4 * 1024

    def __init__(self, in_, out, channel):
        self.in_ = in_
        self.out = out
        self.channel = channel

    @property
    def name(self):
        return b'<%c-channel>' % self.channel

    def read(self, size=-1):
        if size < 0:
            # if we need to consume all the clients input, ask for 4k chunks
            # so the pipe doesn't fill up risking a deadlock
            size = self.maxchunksize
            s = self._read(size, self.channel)
            buf = s
            while s:
                s = self._read(size, self.channel)
                buf += s

            return buf
        else:
            return self._read(size, self.channel)

    def _read(self, size, channel):
        if not size:
            return b''
        assert size > 0

        # tell the client we need at most size bytes
        self.out.write(struct.pack(b'>cI', channel, size))
        self.out.flush()

        length = self.in_.read(4)
        length = struct.unpack(b'>I', length)[0]
        if not length:
            return b''
        else:
            return self.in_.read(length)

    def readline(self, size=-1):
        if size < 0:
            size = self.maxchunksize
            s = self._read(size, b'L')
            buf = s
            # keep asking for more until there's either no more or
            # we got a full line
            while s and not s.endswith(b'\n'):
                s = self._read(size, b'L')
                buf += s

            return buf
        else:
            return self._read(size, b'L')

    def __iter__(self):
        return self

    def next(self):
        l = self.readline()
        if not l:
            raise StopIteration
        return l

    __next__ = next

    def __getattr__(self, attr):
        if attr in ('isatty', 'fileno', 'tell', 'seek'):
            raise AttributeError(attr)
        return getattr(self.in_, attr)


_messageencoders = {
    b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
}


def _selectmessageencoder(ui):
    encnames = ui.configlist(b'cmdserver', b'message-encodings')
    for n in encnames:
        f = _messageencoders.get(n)
        if f:
            return n, f
    raise error.Abort(
        b'no supported message encodings: %s' % b' '.join(encnames)
    )


class server(object):
    """
    Listens for commands on fin, runs them and writes the output on a channel
    based stream to fout.
    """

    def __init__(self, ui, repo, fin, fout, prereposetups=None):
        self.cwd = encoding.getcwd()

        if repo:
            # the ui here is really the repo ui so take its baseui so we don't
            # end up with its local configuration
            self.ui = repo.baseui
            self.repo = repo
            self.repoui = repo.ui
        else:
            self.ui = ui
            self.repo = self.repoui = None
        self._prereposetups = prereposetups

        self.cdebug = channeledoutput(fout, b'd')
        self.cerr = channeledoutput(fout, b'e')
        self.cout = channeledoutput(fout, b'o')
        self.cin = channeledinput(fin, fout, b'I')
        self.cresult = channeledoutput(fout, b'r')

        if self.ui.config(b'cmdserver', b'log') == b'-':
            # switch log stream of server's ui to the 'd' (debug) channel
            # (don't touch repo.ui as its lifetime is longer than the server)
            self.ui = self.ui.copy()
            setuplogging(self.ui, repo=None, fp=self.cdebug)

        self.cmsg = None
        if ui.config(b'ui', b'message-output') == b'channel':
            encname, encfn = _selectmessageencoder(ui)
            self.cmsg = channeledmessage(fout, b'm', encname, encfn)

        self.client = fin

        # If shutdown-on-interrupt is off, the default SIGINT handler is
        # removed so that client-server communication wouldn't be interrupted.
        # For example, 'runcommand' handler will issue three short read()s.
        # If one of the first two read()s were interrupted, the communication
        # channel would be left at dirty state and the subsequent request
        # wouldn't be parsed. So catching KeyboardInterrupt isn't enough.
        self._shutdown_on_interrupt = ui.configbool(
            b'cmdserver', b'shutdown-on-interrupt'
        )
        self._old_inthandler = None
        if not self._shutdown_on_interrupt:
            self._old_inthandler = signal.signal(signal.SIGINT, signal.SIG_IGN)

    def cleanup(self):
        """release and restore resources taken during server session"""
        if not self._shutdown_on_interrupt:
            signal.signal(signal.SIGINT, self._old_inthandler)

    def _read(self, size):
        if not size:
            return b''

        data = self.client.read(size)

        # is the other end closed?
        if not data:
            raise EOFError

        return data

    def _readstr(self):
        """read a string from the channel

        format:
        data length (uint32), data
        """
        length = struct.unpack(b'>I', self._read(4))[0]
        if not length:
            return b''
        return self._read(length)

    def _readlist(self):
        """read a list of NULL separated strings from the channel"""
        s = self._readstr()
        if s:
            return s.split(b'\0')
        else:
            return []

    def _dispatchcommand(self, req):
        from . import dispatch  # avoid cycle

        if self._shutdown_on_interrupt:
            # no need to restore SIGINT handler as it is unmodified.
            return dispatch.dispatch(req)

        try:
            signal.signal(signal.SIGINT, self._old_inthandler)
            return dispatch.dispatch(req)
        except error.SignalInterrupt:
            # propagate SIGBREAK, SIGHUP, or SIGTERM.
            raise
        except KeyboardInterrupt:
            # SIGINT may be received out of the try-except block of dispatch(),
            # so catch it as last ditch. Another KeyboardInterrupt may be
            # raised while handling exceptions here, but there's no way to
            # avoid that except for doing everything in C.
            pass
        finally:
            signal.signal(signal.SIGINT, signal.SIG_IGN)
        # On KeyboardInterrupt, print error message and exit *after* SIGINT
        # handler removed.
        req.ui.error(_(b'interrupted!\n'))
        return -1

    def runcommand(self):
        """reads a list of \0 terminated arguments, executes
        and writes the return code to the result channel"""
        from . import dispatch  # avoid cycle

        args = self._readlist()

        # copy the uis so changes (e.g. --config or --verbose) don't
        # persist between requests
        copiedui = self.ui.copy()
        uis = [copiedui]
        if self.repo:
            self.repo.baseui = copiedui
            # clone ui without using ui.copy because this is protected
            repoui = self.repoui.__class__(self.repoui)
            repoui.copy = copiedui.copy  # redo copy protection
            uis.append(repoui)
            self.repo.ui = self.repo.dirstate._ui = repoui
            self.repo.invalidateall()

        for ui in uis:
            ui.resetstate()
            # any kind of interaction must use server channels, but chg may
            # replace channels by fully functional tty files. so nontty is
            # enforced only if cin is a channel.
            if not util.safehasattr(self.cin, b'fileno'):
                ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')

        req = dispatch.request(
            args[:],
            copiedui,
            self.repo,
            self.cin,
            self.cout,
            self.cerr,
            self.cmsg,
            prereposetups=self._prereposetups,
        )

        try:
            ret = self._dispatchcommand(req) & 255
            # If shutdown-on-interrupt is off, it's important to write the
            # result code *after* SIGINT handler removed. If the result code
            # were lost, the client wouldn't be able to continue processing.
            self.cresult.write(struct.pack(b'>i', int(ret)))
        finally:
            # restore old cwd
            if b'--cwd' in args:
                os.chdir(self.cwd)

    def getencoding(self):
        """ writes the current encoding to the result channel """
        self.cresult.write(encoding.encoding)

    def serveone(self):
        cmd = self.client.readline()[:-1]
        if cmd:
            handler = self.capabilities.get(cmd)
            if handler:
                handler(self)
            else:
                # clients are expected to check what commands are supported by
                # looking at the servers capabilities
                raise error.Abort(_(b'unknown command %s') % cmd)

        return cmd != b''

    capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}

    def serve(self):
        hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
        hellomsg += b'\n'
        hellomsg += b'encoding: ' + encoding.encoding
        hellomsg += b'\n'
        if self.cmsg:
            hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
        hellomsg += b'pid: %d' % procutil.getpid()
        if util.safehasattr(os, b'getpgid'):
            hellomsg += b'\n'
            hellomsg += b'pgid: %d' % os.getpgid(0)

        # write the hello msg in -one- chunk
        self.cout.write(hellomsg)

        try:
            while self.serveone():
                pass
        except EOFError:
            # we'll get here if the client disconnected while we were reading
            # its request
            return 1

        return 0


def setuplogging(ui, repo=None, fp=None):
    """Set up server logging facility

    If cmdserver.log is '-', log messages will be sent to the given fp.
    It should be the 'd' channel while a client is connected, and otherwise
    is the stderr of the server process.
    """
    # developer config: cmdserver.log
    logpath = ui.config(b'cmdserver', b'log')
    if not logpath:
        return
    # developer config: cmdserver.track-log
    tracked = set(ui.configlist(b'cmdserver', b'track-log'))

    if logpath == b'-' and fp:
        logger = loggingutil.fileobjectlogger(fp, tracked)
    elif logpath == b'-':
        logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
    else:
        logpath = os.path.abspath(util.expandpath(logpath))
        # developer config: cmdserver.max-log-files
        maxfiles = ui.configint(b'cmdserver', b'max-log-files')
        # developer config: cmdserver.max-log-size
        maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
        vfs = vfsmod.vfs(os.path.dirname(logpath))
        logger = loggingutil.filelogger(
            vfs,
            os.path.basename(logpath),
            tracked,
            maxfiles=maxfiles,
            maxsize=maxsize,
        )

    targetuis = {ui}
    if repo:
        targetuis.add(repo.baseui)
        targetuis.add(repo.ui)
    for u in targetuis:
        u.setlogger(b'cmdserver', logger)


class pipeservice(object):
    def __init__(self, ui, repo, opts):
        self.ui = ui
        self.repo = repo

    def init(self):
        pass

    def run(self):
        ui = self.ui
        # redirect stdio to null device so that broken extensions or in-process
        # hooks will never cause corruption of channel protocol.
        with ui.protectedfinout() as (fin, fout):
            sv = server(ui, self.repo, fin, fout)
            try:
                return sv.serve()
            finally:
                sv.cleanup()


def _initworkerprocess():
    # use a different process group from the master process, in order to:
    # 1. make the current process group no longer "orphaned" (because the
    #    parent of this process is in a different process group while
    #    remains in a same session)
    #    according to POSIX 2.2.2.52, orphaned process group will ignore
    #    terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
    #    cause trouble for things like ncurses.
    # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
    #    SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
    #    processes like ssh will be killed properly, without affecting
    #    unrelated processes.
    os.setpgid(0, 0)
    # change random state otherwise forked request handlers would have a
    # same state inherited from parent.
    random.seed()


def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
    fin = conn.makefile('rb')
    fout = conn.makefile('wb')
    sv = None
    try:
        sv = createcmdserver(repo, conn, fin, fout, prereposetups)
        try:
            sv.serve()
        # handle exceptions that may be raised by command server. most of
        # known exceptions are caught by dispatch.
        except error.Abort as inst:
            ui.error(_(b'abort: %s\n') % inst.message)
        except IOError as inst:
            if inst.errno != errno.EPIPE:
                raise
        except KeyboardInterrupt:
            pass
        finally:
            sv.cleanup()
    except:  # re-raises
        # also write traceback to error channel. otherwise client cannot
        # see it because it is written to server's stderr by default.
        if sv:
            cerr = sv.cerr
        else:
            cerr = channeledoutput(fout, b'e')
        cerr.write(encoding.strtolocal(traceback.format_exc()))
        raise
    finally:
        fin.close()
        try:
            fout.close()  # implicit flush() may cause another EPIPE
        except IOError as inst:
            if inst.errno != errno.EPIPE:
                raise


class unixservicehandler(object):
    """Set of pluggable operations for unix-mode services

    Almost all methods except for createcmdserver() are called in the main
    process. You can't pass mutable resource back from createcmdserver().
    """

    pollinterval = None

    def __init__(self, ui):
        self.ui = ui

    def bindsocket(self, sock, address):
        util.bindunixsocket(sock, address)
        sock.listen(socket.SOMAXCONN)
        self.ui.status(_(b'listening at %s\n') % address)
        self.ui.flush()  # avoid buffering of status message

    def unlinksocket(self, address):
        os.unlink(address)

    def shouldexit(self):
        """True if server should shut down; checked per pollinterval"""
        return False

    def newconnection(self):
        """Called when main process notices new connection"""

    def createcmdserver(self, repo, conn, fin, fout, prereposetups):
        """Create new command server instance; called in the process that
        serves for the current connection"""
        return server(self.ui, repo, fin, fout, prereposetups)


class unixforkingservice(object):
    """
    Listens on unix domain socket and forks server per connection
    """

    def __init__(self, ui, repo, opts, handler=None):
        self.ui = ui
        self.repo = repo
        self.address = opts[b'address']
        if not util.safehasattr(socket, b'AF_UNIX'):
            raise error.Abort(_(b'unsupported platform'))
        if not self.address:
            raise error.Abort(_(b'no socket path specified with --address'))
        self._servicehandler = handler or unixservicehandler(ui)
        self._sock = None
        self._mainipc = None
        self._workeripc = None
        self._oldsigchldhandler = None
        self._workerpids = set()  # updated by signal handler; do not iterate
        self._socketunlinked = None
        # experimental config: cmdserver.max-repo-cache
        maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
        if maxlen < 0:
            raise error.Abort(_(b'negative max-repo-cache size not allowed'))
        self._repoloader = repocache.repoloader(ui, maxlen)
        # attempt to avoid crash in CoreFoundation when using chg after fix in
        # a89381e04c58
        if pycompat.isdarwin:
            procutil.gui()

    def init(self):
        self._sock = socket.socket(socket.AF_UNIX)
        # IPC channel from many workers to one main process; this is actually
        # a uni-directional pipe, but is backed by a DGRAM socket so each
        # message can be easily separated.
        o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
        self._mainipc, self._workeripc = o
        self._servicehandler.bindsocket(self._sock, self.address)
        if util.safehasattr(procutil, b'unblocksignal'):
            procutil.unblocksignal(signal.SIGCHLD)
        o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
        self._oldsigchldhandler = o
        self._socketunlinked = False
        self._repoloader.start()

    def _unlinksocket(self):
        if not self._socketunlinked:
            self._servicehandler.unlinksocket(self.address)
            self._socketunlinked = True

    def _cleanup(self):
        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
        self._sock.close()
        self._mainipc.close()
        self._workeripc.close()
        self._unlinksocket()
        self._repoloader.stop()
        # don't kill child processes as they have active clients, just wait
        self._reapworkers(0)

    def run(self):
        try:
            self._mainloop()
        finally:
            self._cleanup()

    def _mainloop(self):
        exiting = False
        h = self._servicehandler
        selector = selectors.DefaultSelector()
        selector.register(
            self._sock, selectors.EVENT_READ, self._acceptnewconnection
        )
        selector.register(
            self._mainipc, selectors.EVENT_READ, self._handlemainipc
        )
        while True:
            if not exiting and h.shouldexit():
                # clients can no longer connect() to the domain socket, so
                # we stop queuing new requests.
                # for requests that are queued (connect()-ed, but haven't been
                # accept()-ed), handle them before exit. otherwise, clients
                # waiting for recv() will receive ECONNRESET.
                self._unlinksocket()
                exiting = True
            try:
                events = selector.select(timeout=h.pollinterval)
            except OSError as inst:
                # selectors2 raises ETIMEDOUT if timeout exceeded while
                # handling signal interrupt. That's probably wrong, but
                # we can easily get around it.
                if inst.errno != errno.ETIMEDOUT:
                    raise
                events = []
            if not events:
                # only exit if we completed all queued requests
                if exiting:
                    break
                continue
            for key, _mask in events:
                key.data(key.fileobj, selector)
        selector.close()

    def _acceptnewconnection(self, sock, selector):
        h = self._servicehandler
        try:
            conn, _addr = sock.accept()
        except socket.error as inst:
            if inst.args[0] == errno.EINTR:
                return
            raise

        # Future improvement: On Python 3.7, maybe gc.freeze() can be used
        # to prevent COW memory from being touched by GC.
        # https://instagram-engineering.com/
        #   copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
        pid = os.fork()
        if pid:
            try:
                self.ui.log(
                    b'cmdserver', b'forked worker process (pid=%d)\n', pid
                )
                self._workerpids.add(pid)
                h.newconnection()
            finally:
                conn.close()  # release handle in parent process
        else:
            try:
                selector.close()
                sock.close()
                self._mainipc.close()
                self._runworker(conn)
                conn.close()
                self._workeripc.close()
                os._exit(0)
            except:  # never return, hence no re-raises
                try:
                    self.ui.traceback(force=True)
                finally:
                    os._exit(255)

    def _handlemainipc(self, sock, selector):
        """Process messages sent from a worker"""
        try:
            path = sock.recv(32768)  # large enough to receive path
        except socket.error as inst:
            if inst.args[0] == errno.EINTR:
                return
            raise
        self._repoloader.load(path)

    def _sigchldhandler(self, signal, frame):
        self._reapworkers(os.WNOHANG)

    def _reapworkers(self, options):
        while self._workerpids:
            try:
                pid, _status = os.waitpid(-1, options)
            except OSError as inst:
                if inst.errno == errno.EINTR:
                    continue
                if inst.errno != errno.ECHILD:
                    raise
                # no child processes at all (reaped by other waitpid()?)
                self._workerpids.clear()
                return
            if pid == 0:
                # no waitable child processes
                return
            self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
            self._workerpids.discard(pid)

    def _runworker(self, conn):
        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
        _initworkerprocess()
        h = self._servicehandler
        try:
            _serverequest(
                self.ui,
                self.repo,
                conn,
                h.createcmdserver,
                prereposetups=[self._reposetup],
            )
        finally:
            gc.collect()  # trigger __del__ since worker process uses os._exit

    def _reposetup(self, ui, repo):
        if not repo.local():
            return

        class unixcmdserverrepo(repo.__class__):
            def close(self):
                super(unixcmdserverrepo, self).close()
                try:
                    self._cmdserveripc.send(self.root)
                except socket.error:
                    self.ui.log(
                        b'cmdserver', b'failed to send repo root to master\n'
                    )

        repo.__class__ = unixcmdserverrepo
        repo._cmdserveripc = self._workeripc

        cachedrepo = self._repoloader.get(repo.root)
        if cachedrepo is None:
            return
        repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
        repocache.copycache(cachedrepo, repo)