view tests/test-stdio.py @ 47315:825d5a5907b4

exewrapper: avoid directly linking against python3X.dll Subsequent code calls `LoadLibrary()` to attempt to load the DLL, but because of this symbol reference, there is an attempt to load the DLL used during the build prior to `_main()` running. This causes the whole process to fail if the DLL isn't in the standard search path. That also means it will never load the DLL for HackableMercurial. (Maybe we should get rid of that for py3, since you can install python for a user without admin rights?) This could also be resolved by calling `GetProcAddress()` on the symbol and dereferencing it, but using the environment variable is consistent with the *.bat file since fc8a5c9ecee0. (The environment variable persists after the interpreter is initialized.) Far more concerning is somehow I've gotten my system into a state where setting the flag causes any output to the pager to be lost (as if it wasn't set at all) in MSYS, cmd.exe, WSL, and PowerShell using py3.9.0, but the environment variable works properly. I'm sure this flag worked on some versions of py3, so I'm not sure what's going on here. This is might be related to init config related changes in 3.8[1], since it works with 3.7.8, but fails with 3.8.1. Somebody who understands encoding issues better than I do should give some thought to if we need to make some changes to our encoding strategy on Windows with py3. With or without the flag/envvar, there is proper output if the command is directly paged by piping to `more.com` (in any environment) or `less` (in MSYS and WSL), or if paging is disabled with `--pager=no`. Legacy mode is required though when Mercurial decides to spin up a pager. [1] https://bugs.python.org/issue41941 Differential Revision: https://phab.mercurial-scm.org/D10756
author Matt Harbison <matt_harbison@yahoo.com>
date Tue, 11 May 2021 01:05:38 -0400
parents c102b704edb5
children 23f5ed6dbcb1
line wrap: on
line source

#!/usr/bin/env python3
"""
Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
"""
from __future__ import absolute_import

import contextlib
import errno
import os
import signal
import subprocess
import sys
import tempfile
import unittest

from mercurial import pycompat, util


if pycompat.ispy3:

    def set_noninheritable(fd):
        # On Python 3, file descriptors are non-inheritable by default.
        pass


else:
    if pycompat.iswindows:
        # unused
        set_noninheritable = None
    else:
        import fcntl

        def set_noninheritable(fd):
            old = fcntl.fcntl(fd, fcntl.F_GETFD)
            fcntl.fcntl(fd, fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)


TEST_BUFFERING_CHILD_SCRIPT = r'''
import os

from mercurial import dispatch
from mercurial.utils import procutil

dispatch.initstdio()
procutil.{stream}.write(b'aaa')
os.write(procutil.{stream}.fileno(), b'[written aaa]')
procutil.{stream}.write(b'bbb\n')
os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
'''
UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'


TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
import os
import signal
import sys

from mercurial import dispatch
from mercurial.utils import procutil

signal.signal(signal.SIGINT, lambda *x: None)
dispatch.initstdio()
write_result = procutil.{stream}.write(b'x' * 1048576)
with os.fdopen(
    os.open({write_result_fn!r}, os.O_WRONLY | getattr(os, 'O_TEMPORARY', 0)),
    'w',
) as write_result_f:
    write_result_f.write(str(write_result))
'''


TEST_BROKEN_PIPE_CHILD_SCRIPT = r'''
import os
import pickle

from mercurial import dispatch
from mercurial.utils import procutil

dispatch.initstdio()
procutil.stdin.read(1)  # wait until parent process closed pipe
try:
    procutil.{stream}.write(b'test')
    procutil.{stream}.flush()
except EnvironmentError as e:
    with os.fdopen(
        os.open(
            {err_fn!r},
            os.O_WRONLY
            | getattr(os, 'O_BINARY', 0)
            | getattr(os, 'O_TEMPORARY', 0),
        ),
        'wb',
    ) as err_f:
        pickle.dump(e, err_f)
# Exit early to suppress further broken pipe errors at interpreter shutdown.
os._exit(0)
'''


@contextlib.contextmanager
def _closing(fds):
    try:
        yield
    finally:
        for fd in fds:
            try:
                os.close(fd)
            except EnvironmentError:
                pass


# In the following, we set the FDs non-inheritable mainly to make it possible
# for tests to close the receiving end of the pipe / PTYs.


@contextlib.contextmanager
def _devnull():
    devnull = os.open(os.devnull, os.O_WRONLY)
    # We don't have a receiving end, so it's not worth the effort on Python 2
    # on Windows to make the FD non-inheritable.
    with _closing([devnull]):
        yield (None, devnull)


@contextlib.contextmanager
def _pipes():
    rwpair = os.pipe()
    # Pipes are already non-inheritable on Windows.
    if not pycompat.iswindows:
        set_noninheritable(rwpair[0])
        set_noninheritable(rwpair[1])
    with _closing(rwpair):
        yield rwpair


@contextlib.contextmanager
def _ptys():
    if pycompat.iswindows:
        raise unittest.SkipTest("PTYs are not supported on Windows")
    import pty
    import tty

    rwpair = pty.openpty()
    set_noninheritable(rwpair[0])
    set_noninheritable(rwpair[1])
    with _closing(rwpair):
        tty.setraw(rwpair[0])
        yield rwpair


def _readall(fd, buffer_size, initial_buf=None):
    buf = initial_buf or []
    while True:
        try:
            s = os.read(fd, buffer_size)
        except OSError as e:
            if e.errno == errno.EIO:
                # If the child-facing PTY got closed, reading from the
                # parent-facing PTY raises EIO.
                break
            raise
        if not s:
            break
        buf.append(s)
    return b''.join(buf)


class TestStdio(unittest.TestCase):
    def _test(
        self,
        child_script,
        stream,
        rwpair_generator,
        check_output,
        python_args=[],
        post_child_check=None,
        stdin_generator=None,
    ):
        assert stream in ('stdout', 'stderr')
        if stdin_generator is None:
            stdin_generator = open(os.devnull, 'rb')
        with rwpair_generator() as (
            stream_receiver,
            child_stream,
        ), stdin_generator as child_stdin:
            proc = subprocess.Popen(
                [sys.executable] + python_args + ['-c', child_script],
                stdin=child_stdin,
                stdout=child_stream if stream == 'stdout' else None,
                stderr=child_stream if stream == 'stderr' else None,
            )
            try:
                os.close(child_stream)
                if stream_receiver is not None:
                    check_output(stream_receiver, proc)
            except:  # re-raises
                proc.terminate()
                raise
            finally:
                retcode = proc.wait()
            self.assertEqual(retcode, 0)
            if post_child_check is not None:
                post_child_check()

    def _test_buffering(
        self, stream, rwpair_generator, expected_output, python_args=[]
    ):
        def check_output(stream_receiver, proc):
            self.assertEqual(_readall(stream_receiver, 1024), expected_output)

        self._test(
            TEST_BUFFERING_CHILD_SCRIPT.format(stream=stream),
            stream,
            rwpair_generator,
            check_output,
            python_args,
        )

    def test_buffering_stdout_devnull(self):
        self._test_buffering('stdout', _devnull, None)

    def test_buffering_stdout_pipes(self):
        self._test_buffering('stdout', _pipes, FULLY_BUFFERED)

    def test_buffering_stdout_ptys(self):
        self._test_buffering('stdout', _ptys, LINE_BUFFERED)

    def test_buffering_stdout_devnull_unbuffered(self):
        self._test_buffering('stdout', _devnull, None, python_args=['-u'])

    def test_buffering_stdout_pipes_unbuffered(self):
        self._test_buffering('stdout', _pipes, UNBUFFERED, python_args=['-u'])

    def test_buffering_stdout_ptys_unbuffered(self):
        self._test_buffering('stdout', _ptys, UNBUFFERED, python_args=['-u'])

    if not pycompat.ispy3 and not pycompat.iswindows:
        # On Python 2 on non-Windows, we manually open stdout in line-buffered
        # mode if connected to a TTY. We should check if Python was configured
        # to use unbuffered stdout, but it's hard to do that.
        test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure(
            test_buffering_stdout_ptys_unbuffered
        )

    def _test_large_write(self, stream, rwpair_generator, python_args=[]):
        if not pycompat.ispy3 and pycompat.isdarwin:
            # Python 2 doesn't always retry on EINTR, but the libc might retry.
            # So far, it was observed only on macOS that EINTR is raised at the
            # Python level. As Python 2 support will be dropped soon-ish, we
            # won't attempt to fix it.
            raise unittest.SkipTest("raises EINTR on macOS")

        def check_output(stream_receiver, proc):
            if not pycompat.iswindows:
                # On Unix, we can provoke a partial write() by interrupting it
                # by a signal handler as soon as a bit of data was written.
                # We test that write() is called until all data is written.
                buf = [os.read(stream_receiver, 1)]
                proc.send_signal(signal.SIGINT)
            else:
                # On Windows, there doesn't seem to be a way to cause partial
                # writes.
                buf = []
            self.assertEqual(
                _readall(stream_receiver, 131072, buf), b'x' * 1048576
            )

        def post_child_check():
            write_result_str = write_result_f.read()
            if pycompat.ispy3:
                # On Python 3, we test that the correct number of bytes is
                # claimed to have been written.
                expected_write_result_str = '1048576'
            else:
                # On Python 2, we only check that the large write does not
                # crash.
                expected_write_result_str = 'None'
            self.assertEqual(write_result_str, expected_write_result_str)

        with tempfile.NamedTemporaryFile('r') as write_result_f:
            self._test(
                TEST_LARGE_WRITE_CHILD_SCRIPT.format(
                    stream=stream, write_result_fn=write_result_f.name
                ),
                stream,
                rwpair_generator,
                check_output,
                python_args,
                post_child_check=post_child_check,
            )

    def test_large_write_stdout_devnull(self):
        self._test_large_write('stdout', _devnull)

    def test_large_write_stdout_pipes(self):
        self._test_large_write('stdout', _pipes)

    def test_large_write_stdout_ptys(self):
        self._test_large_write('stdout', _ptys)

    def test_large_write_stdout_devnull_unbuffered(self):
        self._test_large_write('stdout', _devnull, python_args=['-u'])

    def test_large_write_stdout_pipes_unbuffered(self):
        self._test_large_write('stdout', _pipes, python_args=['-u'])

    def test_large_write_stdout_ptys_unbuffered(self):
        self._test_large_write('stdout', _ptys, python_args=['-u'])

    def test_large_write_stderr_devnull(self):
        self._test_large_write('stderr', _devnull)

    def test_large_write_stderr_pipes(self):
        self._test_large_write('stderr', _pipes)

    def test_large_write_stderr_ptys(self):
        self._test_large_write('stderr', _ptys)

    def test_large_write_stderr_devnull_unbuffered(self):
        self._test_large_write('stderr', _devnull, python_args=['-u'])

    def test_large_write_stderr_pipes_unbuffered(self):
        self._test_large_write('stderr', _pipes, python_args=['-u'])

    def test_large_write_stderr_ptys_unbuffered(self):
        self._test_large_write('stderr', _ptys, python_args=['-u'])

    def _test_broken_pipe(self, stream):
        assert stream in ('stdout', 'stderr')

        def check_output(stream_receiver, proc):
            os.close(stream_receiver)
            proc.stdin.write(b'x')
            proc.stdin.close()

        def post_child_check():
            err = util.pickle.load(err_f)
            self.assertEqual(err.errno, errno.EPIPE)
            self.assertEqual(err.strerror, "Broken pipe")

        with tempfile.NamedTemporaryFile('rb') as err_f:
            self._test(
                TEST_BROKEN_PIPE_CHILD_SCRIPT.format(
                    stream=stream, err_fn=err_f.name
                ),
                stream,
                _pipes,
                check_output,
                post_child_check=post_child_check,
                stdin_generator=util.nullcontextmanager(subprocess.PIPE),
            )

    def test_broken_pipe_stdout(self):
        self._test_broken_pipe('stdout')

    def test_broken_pipe_stderr(self):
        self._test_broken_pipe('stderr')


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)