view tests/test-stdio.py @ 45049:513b3ef277a3

rhg: add RootCommand using hg-core FindRoot operation to prepare `hg root` The println macro is not used to avoid string usage. Dealing only with bytes allows us to be compatible with any encoding and not just UTF8. Later on, format macro will be made to have more readable code. Differential Revision: https://phab.mercurial-scm.org/D8612
author Antoine Cezar <antoine.cezar@octobus.net>
date Fri, 05 Jun 2020 09:01:35 +0200
parents 8403cc54bc83
children 8cd18aba5e6c
line wrap: on
line source

#!/usr/bin/env python
"""
Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
"""
from __future__ import absolute_import

import contextlib
import os
import subprocess
import sys
import unittest

from mercurial import pycompat


CHILD_PROCESS = r'''
import os

from mercurial import dispatch
from mercurial.utils import procutil

dispatch.initstdio()
procutil.{stream}.write(b'aaa')
os.write(procutil.{stream}.fileno(), b'[written aaa]')
procutil.{stream}.write(b'bbb\n')
os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
'''
UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'


@contextlib.contextmanager
def _closing(fds):
    try:
        yield
    finally:
        for fd in fds:
            try:
                os.close(fd)
            except EnvironmentError:
                pass


@contextlib.contextmanager
def _pipes():
    rwpair = os.pipe()
    with _closing(rwpair):
        yield rwpair


@contextlib.contextmanager
def _ptys():
    if pycompat.iswindows:
        raise unittest.SkipTest("PTYs are not supported on Windows")
    import pty
    import tty

    rwpair = pty.openpty()
    with _closing(rwpair):
        tty.setraw(rwpair[0])
        yield rwpair


class TestStdio(unittest.TestCase):
    def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
        assert stream in ('stdout', 'stderr')
        with rwpair_generator() as (stream_receiver, child_stream), open(
            os.devnull, 'rb'
        ) as child_stdin:
            proc = subprocess.Popen(
                [sys.executable]
                + python_args
                + ['-c', CHILD_PROCESS.format(stream=stream)],
                stdin=child_stdin,
                stdout=child_stream if stream == 'stdout' else None,
                stderr=child_stream if stream == 'stderr' else None,
            )
            retcode = proc.wait()
            self.assertEqual(retcode, 0)
            self.assertEqual(os.read(stream_receiver, 1024), expected_output)

    def test_stdout_pipes(self):
        self._test('stdout', _pipes, FULLY_BUFFERED)

    def test_stdout_ptys(self):
        self._test('stdout', _ptys, LINE_BUFFERED)

    def test_stdout_pipes_unbuffered(self):
        self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])

    def test_stdout_ptys_unbuffered(self):
        self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])

    if not pycompat.ispy3 and not pycompat.iswindows:
        # On Python 2 on non-Windows, we manually open stdout in line-buffered
        # mode if connected to a TTY. We should check if Python was configured
        # to use unbuffered stdout, but it's hard to do that.
        test_stdout_ptys_unbuffered = unittest.expectedFailure(
            test_stdout_ptys_unbuffered
        )

    def test_stderr_pipes(self):
        self._test('stderr', _pipes, UNBUFFERED)

    def test_stderr_ptys(self):
        self._test('stderr', _ptys, UNBUFFERED)

    def test_stderr_pipes_unbuffered(self):
        self._test('stderr', _pipes, UNBUFFERED, python_args=['-u'])

    def test_stderr_ptys_unbuffered(self):
        self._test('stderr', _ptys, UNBUFFERED, python_args=['-u'])


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)