view tests/test-stdio.py @ 47010:76ae43d5b1db stable

repoview: fix memory leak of filtered repo classes The leak occurs in long-running server processes with extensions, and is measured at 110kB per request. Before this change, the contents of the `_filteredrepotypes` cache are not properly garbage collected, despite it begin a `WeakKeyDictionary`. Extensions have a tendency to generate a new repository class for each `localrepo` instantiation. Server processes based on `hgwebdir_mod` will instantiate a new `localrepo` for each HTTP request that involves a repository. As a result, with a testing process that repeatedly opens a repository with several extensions activated (`topic` notably among them), we see a steady increase in resident memory of 110kB per repository instantiation before this change. This is also true, if we call `gc.collect()` at each instantiation, like `hgwebdir_mod` does, or not. The cause of the leak is that the *values* aren't weak references. This change uses `weakref.ref` for the values, and this makes in our measurements the resident size increase drop to 5kB per repository instantiation, with no explicit call of `gc.collect()` at all. There is currently no reason to believe that this remaining leak of 5kB is related to or even due to Mercurial core. We've also seen evidence that `ui.ui` instances weren't properly garbage collected before the change (with the change, they are). This could explain why the figures are relatively high. In theory, the collection of weak references could lead to much more misses in the cache, so we measured the impact on the original case that was motivation for introducing that cache in 7e89bd0cfb86 (see also issue5043): `hg convert` of the mozilla-central repository. The bad news here is that there is a major memory leak there, both with and without the present changeset. There were no more cache misses, and we could see no more memory leak with this change: the resident size after importing roughly 100000 changesets was at 12.4GB before, and 12.5GB after. The small increase is mentioned for completeness only, and we believe that it should be ignored, at least as long as the main leak isn't fixed. At less than 1% of the main leak, even finding out whether it is merely noise would be wasteful. Original context where this was spotted and first mitigated: https://foss.heptapod.net/heptapod/heptapod/-/issues/466 The leak reduction was also obtained in Heptapod inner HTTP server, which amounts to the same as `hgwebdir_mod` for these questions. The measurements done with Python 3.9, similar figures seen with 3.8. More work on our side would be needed to give measurements with 2.7, because of testing server process does not support it.
author Georges Racinet <georges.racinet@octobus.net>
date Fri, 23 Apr 2021 18:30:53 +0200
parents c102b704edb5
children 23f5ed6dbcb1
line wrap: on
line source

#!/usr/bin/env python3
"""
Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
"""
from __future__ import absolute_import

import contextlib
import errno
import os
import signal
import subprocess
import sys
import tempfile
import unittest

from mercurial import pycompat, util


if pycompat.ispy3:

    def set_noninheritable(fd):
        # On Python 3, file descriptors are non-inheritable by default.
        pass


else:
    if pycompat.iswindows:
        # unused
        set_noninheritable = None
    else:
        import fcntl

        def set_noninheritable(fd):
            old = fcntl.fcntl(fd, fcntl.F_GETFD)
            fcntl.fcntl(fd, fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)


TEST_BUFFERING_CHILD_SCRIPT = r'''
import os

from mercurial import dispatch
from mercurial.utils import procutil

dispatch.initstdio()
procutil.{stream}.write(b'aaa')
os.write(procutil.{stream}.fileno(), b'[written aaa]')
procutil.{stream}.write(b'bbb\n')
os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
'''
UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'


TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
import os
import signal
import sys

from mercurial import dispatch
from mercurial.utils import procutil

signal.signal(signal.SIGINT, lambda *x: None)
dispatch.initstdio()
write_result = procutil.{stream}.write(b'x' * 1048576)
with os.fdopen(
    os.open({write_result_fn!r}, os.O_WRONLY | getattr(os, 'O_TEMPORARY', 0)),
    'w',
) as write_result_f:
    write_result_f.write(str(write_result))
'''


TEST_BROKEN_PIPE_CHILD_SCRIPT = r'''
import os
import pickle

from mercurial import dispatch
from mercurial.utils import procutil

dispatch.initstdio()
procutil.stdin.read(1)  # wait until parent process closed pipe
try:
    procutil.{stream}.write(b'test')
    procutil.{stream}.flush()
except EnvironmentError as e:
    with os.fdopen(
        os.open(
            {err_fn!r},
            os.O_WRONLY
            | getattr(os, 'O_BINARY', 0)
            | getattr(os, 'O_TEMPORARY', 0),
        ),
        'wb',
    ) as err_f:
        pickle.dump(e, err_f)
# Exit early to suppress further broken pipe errors at interpreter shutdown.
os._exit(0)
'''


@contextlib.contextmanager
def _closing(fds):
    try:
        yield
    finally:
        for fd in fds:
            try:
                os.close(fd)
            except EnvironmentError:
                pass


# In the following, we set the FDs non-inheritable mainly to make it possible
# for tests to close the receiving end of the pipe / PTYs.


@contextlib.contextmanager
def _devnull():
    devnull = os.open(os.devnull, os.O_WRONLY)
    # We don't have a receiving end, so it's not worth the effort on Python 2
    # on Windows to make the FD non-inheritable.
    with _closing([devnull]):
        yield (None, devnull)


@contextlib.contextmanager
def _pipes():
    rwpair = os.pipe()
    # Pipes are already non-inheritable on Windows.
    if not pycompat.iswindows:
        set_noninheritable(rwpair[0])
        set_noninheritable(rwpair[1])
    with _closing(rwpair):
        yield rwpair


@contextlib.contextmanager
def _ptys():
    if pycompat.iswindows:
        raise unittest.SkipTest("PTYs are not supported on Windows")
    import pty
    import tty

    rwpair = pty.openpty()
    set_noninheritable(rwpair[0])
    set_noninheritable(rwpair[1])
    with _closing(rwpair):
        tty.setraw(rwpair[0])
        yield rwpair


def _readall(fd, buffer_size, initial_buf=None):
    buf = initial_buf or []
    while True:
        try:
            s = os.read(fd, buffer_size)
        except OSError as e:
            if e.errno == errno.EIO:
                # If the child-facing PTY got closed, reading from the
                # parent-facing PTY raises EIO.
                break
            raise
        if not s:
            break
        buf.append(s)
    return b''.join(buf)


class TestStdio(unittest.TestCase):
    def _test(
        self,
        child_script,
        stream,
        rwpair_generator,
        check_output,
        python_args=[],
        post_child_check=None,
        stdin_generator=None,
    ):
        assert stream in ('stdout', 'stderr')
        if stdin_generator is None:
            stdin_generator = open(os.devnull, 'rb')
        with rwpair_generator() as (
            stream_receiver,
            child_stream,
        ), stdin_generator as child_stdin:
            proc = subprocess.Popen(
                [sys.executable] + python_args + ['-c', child_script],
                stdin=child_stdin,
                stdout=child_stream if stream == 'stdout' else None,
                stderr=child_stream if stream == 'stderr' else None,
            )
            try:
                os.close(child_stream)
                if stream_receiver is not None:
                    check_output(stream_receiver, proc)
            except:  # re-raises
                proc.terminate()
                raise
            finally:
                retcode = proc.wait()
            self.assertEqual(retcode, 0)
            if post_child_check is not None:
                post_child_check()

    def _test_buffering(
        self, stream, rwpair_generator, expected_output, python_args=[]
    ):
        def check_output(stream_receiver, proc):
            self.assertEqual(_readall(stream_receiver, 1024), expected_output)

        self._test(
            TEST_BUFFERING_CHILD_SCRIPT.format(stream=stream),
            stream,
            rwpair_generator,
            check_output,
            python_args,
        )

    def test_buffering_stdout_devnull(self):
        self._test_buffering('stdout', _devnull, None)

    def test_buffering_stdout_pipes(self):
        self._test_buffering('stdout', _pipes, FULLY_BUFFERED)

    def test_buffering_stdout_ptys(self):
        self._test_buffering('stdout', _ptys, LINE_BUFFERED)

    def test_buffering_stdout_devnull_unbuffered(self):
        self._test_buffering('stdout', _devnull, None, python_args=['-u'])

    def test_buffering_stdout_pipes_unbuffered(self):
        self._test_buffering('stdout', _pipes, UNBUFFERED, python_args=['-u'])

    def test_buffering_stdout_ptys_unbuffered(self):
        self._test_buffering('stdout', _ptys, UNBUFFERED, python_args=['-u'])

    if not pycompat.ispy3 and not pycompat.iswindows:
        # On Python 2 on non-Windows, we manually open stdout in line-buffered
        # mode if connected to a TTY. We should check if Python was configured
        # to use unbuffered stdout, but it's hard to do that.
        test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure(
            test_buffering_stdout_ptys_unbuffered
        )

    def _test_large_write(self, stream, rwpair_generator, python_args=[]):
        if not pycompat.ispy3 and pycompat.isdarwin:
            # Python 2 doesn't always retry on EINTR, but the libc might retry.
            # So far, it was observed only on macOS that EINTR is raised at the
            # Python level. As Python 2 support will be dropped soon-ish, we
            # won't attempt to fix it.
            raise unittest.SkipTest("raises EINTR on macOS")

        def check_output(stream_receiver, proc):
            if not pycompat.iswindows:
                # On Unix, we can provoke a partial write() by interrupting it
                # by a signal handler as soon as a bit of data was written.
                # We test that write() is called until all data is written.
                buf = [os.read(stream_receiver, 1)]
                proc.send_signal(signal.SIGINT)
            else:
                # On Windows, there doesn't seem to be a way to cause partial
                # writes.
                buf = []
            self.assertEqual(
                _readall(stream_receiver, 131072, buf), b'x' * 1048576
            )

        def post_child_check():
            write_result_str = write_result_f.read()
            if pycompat.ispy3:
                # On Python 3, we test that the correct number of bytes is
                # claimed to have been written.
                expected_write_result_str = '1048576'
            else:
                # On Python 2, we only check that the large write does not
                # crash.
                expected_write_result_str = 'None'
            self.assertEqual(write_result_str, expected_write_result_str)

        with tempfile.NamedTemporaryFile('r') as write_result_f:
            self._test(
                TEST_LARGE_WRITE_CHILD_SCRIPT.format(
                    stream=stream, write_result_fn=write_result_f.name
                ),
                stream,
                rwpair_generator,
                check_output,
                python_args,
                post_child_check=post_child_check,
            )

    def test_large_write_stdout_devnull(self):
        self._test_large_write('stdout', _devnull)

    def test_large_write_stdout_pipes(self):
        self._test_large_write('stdout', _pipes)

    def test_large_write_stdout_ptys(self):
        self._test_large_write('stdout', _ptys)

    def test_large_write_stdout_devnull_unbuffered(self):
        self._test_large_write('stdout', _devnull, python_args=['-u'])

    def test_large_write_stdout_pipes_unbuffered(self):
        self._test_large_write('stdout', _pipes, python_args=['-u'])

    def test_large_write_stdout_ptys_unbuffered(self):
        self._test_large_write('stdout', _ptys, python_args=['-u'])

    def test_large_write_stderr_devnull(self):
        self._test_large_write('stderr', _devnull)

    def test_large_write_stderr_pipes(self):
        self._test_large_write('stderr', _pipes)

    def test_large_write_stderr_ptys(self):
        self._test_large_write('stderr', _ptys)

    def test_large_write_stderr_devnull_unbuffered(self):
        self._test_large_write('stderr', _devnull, python_args=['-u'])

    def test_large_write_stderr_pipes_unbuffered(self):
        self._test_large_write('stderr', _pipes, python_args=['-u'])

    def test_large_write_stderr_ptys_unbuffered(self):
        self._test_large_write('stderr', _ptys, python_args=['-u'])

    def _test_broken_pipe(self, stream):
        assert stream in ('stdout', 'stderr')

        def check_output(stream_receiver, proc):
            os.close(stream_receiver)
            proc.stdin.write(b'x')
            proc.stdin.close()

        def post_child_check():
            err = util.pickle.load(err_f)
            self.assertEqual(err.errno, errno.EPIPE)
            self.assertEqual(err.strerror, "Broken pipe")

        with tempfile.NamedTemporaryFile('rb') as err_f:
            self._test(
                TEST_BROKEN_PIPE_CHILD_SCRIPT.format(
                    stream=stream, err_fn=err_f.name
                ),
                stream,
                _pipes,
                check_output,
                post_child_check=post_child_check,
                stdin_generator=util.nullcontextmanager(subprocess.PIPE),
            )

    def test_broken_pipe_stdout(self):
        self._test_broken_pipe('stdout')

    def test_broken_pipe_stderr(self):
        self._test_broken_pipe('stderr')


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)