Mercurial > hg
view tests/test-stdio.py @ 45075:04ef381000a8
hooklib: fix detection of successors for changeset_obsoleted
Provide a hook for obsutil.getobsolete to be used with either a
transaction or the changes item of the transaction, since hooks only
have access to the latter. Use that to find the correct list of
revisions with obsmarkers, even new ones, and then filter out revisions
with known successors.
Move the processing from pretxnclose to txnclose as the transaction
access itself is no longer necessary. This is more in line with notify
and ensures that sanity checks can abort the transaction first.
Differential Revision: https://phab.mercurial-scm.org/D8575
author | Joerg Sonnenberger <joerg@bec.de> |
---|---|
date | Thu, 21 May 2020 18:18:50 +0200 |
parents | bc05c13e246f |
children | fa270dcbdb55 |
line wrap: on
line source
#!/usr/bin/env python """ Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`. """ from __future__ import absolute_import import contextlib import errno import os import subprocess import sys import unittest from mercurial import pycompat BUFFERING_CHILD_SCRIPT = r''' import os from mercurial import dispatch from mercurial.utils import procutil dispatch.initstdio() procutil.{stream}.write(b'aaa') os.write(procutil.{stream}.fileno(), b'[written aaa]') procutil.{stream}.write(b'bbb\n') os.write(procutil.{stream}.fileno(), b'[written bbb\\n]') ''' UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]' LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]' FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n' @contextlib.contextmanager def _closing(fds): try: yield finally: for fd in fds: try: os.close(fd) except EnvironmentError: pass @contextlib.contextmanager def _pipes(): rwpair = os.pipe() with _closing(rwpair): yield rwpair @contextlib.contextmanager def _ptys(): if pycompat.iswindows: raise unittest.SkipTest("PTYs are not supported on Windows") import pty import tty rwpair = pty.openpty() with _closing(rwpair): tty.setraw(rwpair[0]) yield rwpair def _readall(fd, buffer_size): buf = [] while True: try: s = os.read(fd, buffer_size) except OSError as e: if e.errno == errno.EIO: # If the child-facing PTY got closed, reading from the # parent-facing PTY raises EIO. break raise if not s: break buf.append(s) return b''.join(buf) class TestStdio(unittest.TestCase): def _test(self, stream, rwpair_generator, expected_output, python_args=[]): assert stream in ('stdout', 'stderr') with rwpair_generator() as (stream_receiver, child_stream), open( os.devnull, 'rb' ) as child_stdin: proc = subprocess.Popen( [sys.executable] + python_args + ['-c', BUFFERING_CHILD_SCRIPT.format(stream=stream)], stdin=child_stdin, stdout=child_stream if stream == 'stdout' else None, stderr=child_stream if stream == 'stderr' else None, ) try: os.close(child_stream) self.assertEqual( _readall(stream_receiver, 1024), expected_output ) except: # re-raises proc.terminate() raise finally: retcode = proc.wait() self.assertEqual(retcode, 0) def test_buffering_stdout_pipes(self): self._test('stdout', _pipes, FULLY_BUFFERED) def test_buffering_stdout_ptys(self): self._test('stdout', _ptys, LINE_BUFFERED) def test_buffering_stdout_pipes_unbuffered(self): self._test('stdout', _pipes, UNBUFFERED, python_args=['-u']) def test_buffering_stdout_ptys_unbuffered(self): self._test('stdout', _ptys, UNBUFFERED, python_args=['-u']) if not pycompat.ispy3 and not pycompat.iswindows: # On Python 2 on non-Windows, we manually open stdout in line-buffered # mode if connected to a TTY. We should check if Python was configured # to use unbuffered stdout, but it's hard to do that. test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure( test_buffering_stdout_ptys_unbuffered ) def test_buffering_stderr_pipes(self): self._test('stderr', _pipes, UNBUFFERED) def test_buffering_stderr_ptys(self): self._test('stderr', _ptys, UNBUFFERED) def test_buffering_stderr_pipes_unbuffered(self): self._test('stderr', _pipes, UNBUFFERED, python_args=['-u']) def test_buffering_stderr_ptys_unbuffered(self): self._test('stderr', _ptys, UNBUFFERED, python_args=['-u']) if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)