view tests/test-stdio.py @ 45074:797ef6f8295e

fix: prefetch file contents This prevents the worker subprocesses from contacting the server individually, which is either inefficient, or leads to problems if the connection is shared among them. Differential Revision: https://phab.mercurial-scm.org/D8723
author Rodrigo Damazio Bovendorp <rdamazio@google.com>
date Thu, 09 Jul 2020 20:46:52 -0700
parents bc05c13e246f
children fa270dcbdb55
line wrap: on
line source

#!/usr/bin/env python
"""
Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`.
"""
from __future__ import absolute_import

import contextlib
import errno
import os
import subprocess
import sys
import unittest

from mercurial import pycompat


BUFFERING_CHILD_SCRIPT = r'''
import os

from mercurial import dispatch
from mercurial.utils import procutil

dispatch.initstdio()
procutil.{stream}.write(b'aaa')
os.write(procutil.{stream}.fileno(), b'[written aaa]')
procutil.{stream}.write(b'bbb\n')
os.write(procutil.{stream}.fileno(), b'[written bbb\\n]')
'''
UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]'
LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'


@contextlib.contextmanager
def _closing(fds):
    try:
        yield
    finally:
        for fd in fds:
            try:
                os.close(fd)
            except EnvironmentError:
                pass


@contextlib.contextmanager
def _pipes():
    rwpair = os.pipe()
    with _closing(rwpair):
        yield rwpair


@contextlib.contextmanager
def _ptys():
    if pycompat.iswindows:
        raise unittest.SkipTest("PTYs are not supported on Windows")
    import pty
    import tty

    rwpair = pty.openpty()
    with _closing(rwpair):
        tty.setraw(rwpair[0])
        yield rwpair


def _readall(fd, buffer_size):
    buf = []
    while True:
        try:
            s = os.read(fd, buffer_size)
        except OSError as e:
            if e.errno == errno.EIO:
                # If the child-facing PTY got closed, reading from the
                # parent-facing PTY raises EIO.
                break
            raise
        if not s:
            break
        buf.append(s)
    return b''.join(buf)


class TestStdio(unittest.TestCase):
    def _test(self, stream, rwpair_generator, expected_output, python_args=[]):
        assert stream in ('stdout', 'stderr')
        with rwpair_generator() as (stream_receiver, child_stream), open(
            os.devnull, 'rb'
        ) as child_stdin:
            proc = subprocess.Popen(
                [sys.executable]
                + python_args
                + ['-c', BUFFERING_CHILD_SCRIPT.format(stream=stream)],
                stdin=child_stdin,
                stdout=child_stream if stream == 'stdout' else None,
                stderr=child_stream if stream == 'stderr' else None,
            )
            try:
                os.close(child_stream)
                self.assertEqual(
                    _readall(stream_receiver, 1024), expected_output
                )
            except:  # re-raises
                proc.terminate()
                raise
            finally:
                retcode = proc.wait()
            self.assertEqual(retcode, 0)

    def test_buffering_stdout_pipes(self):
        self._test('stdout', _pipes, FULLY_BUFFERED)

    def test_buffering_stdout_ptys(self):
        self._test('stdout', _ptys, LINE_BUFFERED)

    def test_buffering_stdout_pipes_unbuffered(self):
        self._test('stdout', _pipes, UNBUFFERED, python_args=['-u'])

    def test_buffering_stdout_ptys_unbuffered(self):
        self._test('stdout', _ptys, UNBUFFERED, python_args=['-u'])

    if not pycompat.ispy3 and not pycompat.iswindows:
        # On Python 2 on non-Windows, we manually open stdout in line-buffered
        # mode if connected to a TTY. We should check if Python was configured
        # to use unbuffered stdout, but it's hard to do that.
        test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure(
            test_buffering_stdout_ptys_unbuffered
        )

    def test_buffering_stderr_pipes(self):
        self._test('stderr', _pipes, UNBUFFERED)

    def test_buffering_stderr_ptys(self):
        self._test('stderr', _ptys, UNBUFFERED)

    def test_buffering_stderr_pipes_unbuffered(self):
        self._test('stderr', _pipes, UNBUFFERED, python_args=['-u'])

    def test_buffering_stderr_ptys_unbuffered(self):
        self._test('stderr', _ptys, UNBUFFERED, python_args=['-u'])


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)