view tests/test-stdio.py @ 45121:b6269741ed42

config: add option to control creation of empty successors during rewrite The default for many history-rewriting commands (e.g. rebase and absorb) is that changesets which would become empty are not created in the target branch. This makes sense if the source branch consists of small fix-up changes. For more advanced workflows that make heavy use of history-editing to create curated patch series, dropping empty changesets is not as important or even undesirable. Some users want to keep the meta-history, e.g. to make finding comments in a code review tool easier or to avoid that divergent bookmarks are created. For that, obsmarkers from the (to-be) empty changeset to the changeset(s) that already made the changes should be added. If a to-be empty changeset is pruned without a successor, adding the obsmarkers is hard because the changeset has to be found within the hidden part of the history. If rebasing in TortoiseHg, it’s easy to miss the fact that the to-be empty changeset was pruned. An empty changeset will function as a reminder that obsmarkers should be added. Martin von Zweigbergk mentioned another advantage. Stripping the successor will de-obsolete the predecessor. If no (empty) successor is created, this won’t be possible. In the future, we may want to consider other behaviors, like e.g. creating the empty successor, but pruning it right away. Therefore this configuration accepts 'skip' and 'keep' instead of being a boolean configuration.
author Manuel Jacob <me@manueljacob.de>
date Sat, 11 Jul 2020 23:53:27 +0200
parents eb26a9cf7821
children c2c862b9b544
line wrap: on
line source

#!/usr/bin/env python
"""
Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
"""
from __future__ import absolute_import

import contextlib
import errno
import os
import signal
import subprocess
import sys
import tempfile
import unittest

from mercurial import pycompat


TEST_BUFFERING_CHILD_SCRIPT = r'''
import os

from mercurial import dispatch
from mercurial.utils import procutil

dispatch.initstdio()
procutil.{stream}.write(b'aaa')
os.write(procutil.{stream}.fileno(), b'[written aaa]')
procutil.{stream}.write(b'bbb\n')
os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
'''
UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'


TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
import os
import signal
import sys

from mercurial import dispatch
from mercurial.utils import procutil

signal.signal(signal.SIGINT, lambda *x: None)
dispatch.initstdio()
write_result = procutil.{stream}.write(b'x' * 1048576)
with os.fdopen(
    os.open({write_result_fn!r}, os.O_WRONLY | getattr(os, 'O_TEMPORARY', 0)),
    'w',
) as write_result_f:
    write_result_f.write(str(write_result))
'''


@contextlib.contextmanager
def _closing(fds):
    try:
        yield
    finally:
        for fd in fds:
            try:
                os.close(fd)
            except EnvironmentError:
                pass


@contextlib.contextmanager
def _devnull():
    devnull = os.open(os.devnull, os.O_WRONLY)
    with _closing([devnull]):
        yield (None, devnull)


@contextlib.contextmanager
def _pipes():
    rwpair = os.pipe()
    with _closing(rwpair):
        yield rwpair


@contextlib.contextmanager
def _ptys():
    if pycompat.iswindows:
        raise unittest.SkipTest("PTYs are not supported on Windows")
    import pty
    import tty

    rwpair = pty.openpty()
    with _closing(rwpair):
        tty.setraw(rwpair[0])
        yield rwpair


def _readall(fd, buffer_size, initial_buf=None):
    buf = initial_buf or []
    while True:
        try:
            s = os.read(fd, buffer_size)
        except OSError as e:
            if e.errno == errno.EIO:
                # If the child-facing PTY got closed, reading from the
                # parent-facing PTY raises EIO.
                break
            raise
        if not s:
            break
        buf.append(s)
    return b''.join(buf)


class TestStdio(unittest.TestCase):
    def _test(
        self,
        child_script,
        stream,
        rwpair_generator,
        check_output,
        python_args=[],
        post_child_check=None,
    ):
        assert stream in ('stdout', 'stderr')
        with rwpair_generator() as (stream_receiver, child_stream), open(
            os.devnull, 'rb'
        ) as child_stdin:
            proc = subprocess.Popen(
                [sys.executable] + python_args + ['-c', child_script],
                stdin=child_stdin,
                stdout=child_stream if stream == 'stdout' else None,
                stderr=child_stream if stream == 'stderr' else None,
            )
            try:
                os.close(child_stream)
                if stream_receiver is not None:
                    check_output(stream_receiver, proc)
            except:  # re-raises
                proc.terminate()
                raise
            finally:
                retcode = proc.wait()
            self.assertEqual(retcode, 0)
            if post_child_check is not None:
                post_child_check()

    def _test_buffering(
        self, stream, rwpair_generator, expected_output, python_args=[]
    ):
        def check_output(stream_receiver, proc):
            self.assertEqual(_readall(stream_receiver, 1024), expected_output)

        self._test(
            TEST_BUFFERING_CHILD_SCRIPT.format(stream=stream),
            stream,
            rwpair_generator,
            check_output,
            python_args,
        )

    def test_buffering_stdout_devnull(self):
        self._test_buffering('stdout', _devnull, None)

    def test_buffering_stdout_pipes(self):
        self._test_buffering('stdout', _pipes, FULLY_BUFFERED)

    def test_buffering_stdout_ptys(self):
        self._test_buffering('stdout', _ptys, LINE_BUFFERED)

    def test_buffering_stdout_devnull_unbuffered(self):
        self._test_buffering('stdout', _devnull, None, python_args=['-u'])

    def test_buffering_stdout_pipes_unbuffered(self):
        self._test_buffering('stdout', _pipes, UNBUFFERED, python_args=['-u'])

    def test_buffering_stdout_ptys_unbuffered(self):
        self._test_buffering('stdout', _ptys, UNBUFFERED, python_args=['-u'])

    if not pycompat.ispy3 and not pycompat.iswindows:
        # On Python 2 on non-Windows, we manually open stdout in line-buffered
        # mode if connected to a TTY. We should check if Python was configured
        # to use unbuffered stdout, but it's hard to do that.
        test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure(
            test_buffering_stdout_ptys_unbuffered
        )

    def _test_large_write(self, stream, rwpair_generator, python_args=[]):
        if not pycompat.ispy3 and pycompat.isdarwin:
            # Python 2 doesn't always retry on EINTR, but the libc might retry.
            # So far, it was observed only on macOS that EINTR is raised at the
            # Python level. As Python 2 support will be dropped soon-ish, we
            # won't attempt to fix it.
            raise unittest.SkipTest("raises EINTR on macOS")

        def check_output(stream_receiver, proc):
            if not pycompat.iswindows:
                # On Unix, we can provoke a partial write() by interrupting it
                # by a signal handler as soon as a bit of data was written.
                # We test that write() is called until all data is written.
                buf = [os.read(stream_receiver, 1)]
                proc.send_signal(signal.SIGINT)
            else:
                # On Windows, there doesn't seem to be a way to cause partial
                # writes.
                buf = []
            self.assertEqual(
                _readall(stream_receiver, 131072, buf), b'x' * 1048576
            )

        def post_child_check():
            write_result_str = write_result_f.read()
            if pycompat.ispy3:
                # On Python 3, we test that the correct number of bytes is
                # claimed to have been written.
                expected_write_result_str = '1048576'
            else:
                # On Python 2, we only check that the large write does not
                # crash.
                expected_write_result_str = 'None'
            self.assertEqual(write_result_str, expected_write_result_str)

        with tempfile.NamedTemporaryFile('r') as write_result_f:
            self._test(
                TEST_LARGE_WRITE_CHILD_SCRIPT.format(
                    stream=stream, write_result_fn=write_result_f.name
                ),
                stream,
                rwpair_generator,
                check_output,
                python_args,
                post_child_check=post_child_check,
            )

    def test_large_write_stdout_devnull(self):
        self._test_large_write('stdout', _devnull)

    def test_large_write_stdout_pipes(self):
        self._test_large_write('stdout', _pipes)

    def test_large_write_stdout_ptys(self):
        self._test_large_write('stdout', _ptys)

    def test_large_write_stdout_devnull_unbuffered(self):
        self._test_large_write('stdout', _devnull, python_args=['-u'])

    def test_large_write_stdout_pipes_unbuffered(self):
        self._test_large_write('stdout', _pipes, python_args=['-u'])

    def test_large_write_stdout_ptys_unbuffered(self):
        self._test_large_write('stdout', _ptys, python_args=['-u'])

    def test_large_write_stderr_devnull(self):
        self._test_large_write('stderr', _devnull)

    def test_large_write_stderr_pipes(self):
        self._test_large_write('stderr', _pipes)

    def test_large_write_stderr_ptys(self):
        self._test_large_write('stderr', _ptys)

    def test_large_write_stderr_devnull_unbuffered(self):
        self._test_large_write('stderr', _devnull, python_args=['-u'])

    def test_large_write_stderr_pipes_unbuffered(self):
        self._test_large_write('stderr', _pipes, python_args=['-u'])

    def test_large_write_stderr_ptys_unbuffered(self):
        self._test_large_write('stderr', _ptys, python_args=['-u'])


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)