Mercurial > hg
view tests/test-stdio.py @ 46607:e9901d01d135
revlog: add a mechanism to verify expected file position before appending
If someone uses `hg debuglocks`, or some non-hg process writes to the .hg
directory without respecting the locks, or if the repo's on a networked
filesystem, it's possible for the revlog code to write out corrupted data.
The form of this corruption can vary depending on what data was written and how
that happened. We are in the "networked filesystem" case (though I've had users
also do this to themselves with the "`hg debuglocks`" scenario), and most often
see this with the changelog. What ends up happening is we produce two items
(let's call them rev1 and rev2) in the .i file that have the same linkrev,
baserev, and offset into the .d file, while the data in the .d file is appended
properly. rev2's compressed_size is accurate for rev2, but when we go to
decompress the data in the .d file, we use the offset that's recorded in the
index file, which is the same as rev1, and attempt to decompress
rev2.compressed_size bytes of rev1's data. This usually does not succeed. :)
When using inline data, this also fails, though I haven't investigated why too
closely. This shows up as a "patch decode" error. I believe what's happening
there is that we're basically ignoring the offset field, getting the data
properly, but since baserev != rev, it thinks this is a delta based on rev
(instead of a full text) and can't actually apply it as such.
For now, I'm going to make this an optional component and default it to entirely
off. I may increase the default severity of this in the future, once I've
enabled it for my users and we gain more experience with it. Luckily, most of my
users have a versioned filesystem and can roll back to before the corruption has
been written, it's just a hassle to do so and not everyone knows how (so it's a
support burden). Users on other filesystems will not have that luxury, and this
can cause them to have a corrupted repository that they are unlikely to know how
to resolve, and they'll see this as a data-loss event. Refusing to create the
corruption is a much better user experience.
This mechanism is not perfect. There may be false-negatives (racy writes that
are not detected). There should not be any false-positives (non-racy writes that
are detected as such). This is not a mechanism that makes putting a repo on a
networked filesystem "safe" or "supported", just *less* likely to cause
corruption.
Differential Revision: https://phab.mercurial-scm.org/D9952
author | Kyle Lippincott <spectral@google.com> |
---|---|
date | Wed, 03 Feb 2021 16:33:10 -0800 |
parents | c102b704edb5 |
children | 23f5ed6dbcb1 |
line wrap: on
line source
#!/usr/bin/env python3 """ Tests the buffering behavior of stdio streams in `mercurial.utils.procutil`. """ from __future__ import absolute_import import contextlib import errno import os import signal import subprocess import sys import tempfile import unittest from mercurial import pycompat, util if pycompat.ispy3: def set_noninheritable(fd): # On Python 3, file descriptors are non-inheritable by default. pass else: if pycompat.iswindows: # unused set_noninheritable = None else: import fcntl def set_noninheritable(fd): old = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, old | fcntl.FD_CLOEXEC) TEST_BUFFERING_CHILD_SCRIPT = r''' import os from mercurial import dispatch from mercurial.utils import procutil dispatch.initstdio() procutil.{stream}.write(b'aaa') os.write(procutil.{stream}.fileno(), b'[written aaa]') procutil.{stream}.write(b'bbb\n') os.write(procutil.{stream}.fileno(), b'[written bbb\\n]') ''' UNBUFFERED = b'aaa[written aaa]bbb\n[written bbb\\n]' LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]' FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n' TEST_LARGE_WRITE_CHILD_SCRIPT = r''' import os import signal import sys from mercurial import dispatch from mercurial.utils import procutil signal.signal(signal.SIGINT, lambda *x: None) dispatch.initstdio() write_result = procutil.{stream}.write(b'x' * 1048576) with os.fdopen( os.open({write_result_fn!r}, os.O_WRONLY | getattr(os, 'O_TEMPORARY', 0)), 'w', ) as write_result_f: write_result_f.write(str(write_result)) ''' TEST_BROKEN_PIPE_CHILD_SCRIPT = r''' import os import pickle from mercurial import dispatch from mercurial.utils import procutil dispatch.initstdio() procutil.stdin.read(1) # wait until parent process closed pipe try: procutil.{stream}.write(b'test') procutil.{stream}.flush() except EnvironmentError as e: with os.fdopen( os.open( {err_fn!r}, os.O_WRONLY | getattr(os, 'O_BINARY', 0) | getattr(os, 'O_TEMPORARY', 0), ), 'wb', ) as err_f: pickle.dump(e, err_f) # Exit early to suppress further broken pipe errors at interpreter shutdown. os._exit(0) ''' @contextlib.contextmanager def _closing(fds): try: yield finally: for fd in fds: try: os.close(fd) except EnvironmentError: pass # In the following, we set the FDs non-inheritable mainly to make it possible # for tests to close the receiving end of the pipe / PTYs. @contextlib.contextmanager def _devnull(): devnull = os.open(os.devnull, os.O_WRONLY) # We don't have a receiving end, so it's not worth the effort on Python 2 # on Windows to make the FD non-inheritable. with _closing([devnull]): yield (None, devnull) @contextlib.contextmanager def _pipes(): rwpair = os.pipe() # Pipes are already non-inheritable on Windows. if not pycompat.iswindows: set_noninheritable(rwpair[0]) set_noninheritable(rwpair[1]) with _closing(rwpair): yield rwpair @contextlib.contextmanager def _ptys(): if pycompat.iswindows: raise unittest.SkipTest("PTYs are not supported on Windows") import pty import tty rwpair = pty.openpty() set_noninheritable(rwpair[0]) set_noninheritable(rwpair[1]) with _closing(rwpair): tty.setraw(rwpair[0]) yield rwpair def _readall(fd, buffer_size, initial_buf=None): buf = initial_buf or [] while True: try: s = os.read(fd, buffer_size) except OSError as e: if e.errno == errno.EIO: # If the child-facing PTY got closed, reading from the # parent-facing PTY raises EIO. break raise if not s: break buf.append(s) return b''.join(buf) class TestStdio(unittest.TestCase): def _test( self, child_script, stream, rwpair_generator, check_output, python_args=[], post_child_check=None, stdin_generator=None, ): assert stream in ('stdout', 'stderr') if stdin_generator is None: stdin_generator = open(os.devnull, 'rb') with rwpair_generator() as ( stream_receiver, child_stream, ), stdin_generator as child_stdin: proc = subprocess.Popen( [sys.executable] + python_args + ['-c', child_script], stdin=child_stdin, stdout=child_stream if stream == 'stdout' else None, stderr=child_stream if stream == 'stderr' else None, ) try: os.close(child_stream) if stream_receiver is not None: check_output(stream_receiver, proc) except: # re-raises proc.terminate() raise finally: retcode = proc.wait() self.assertEqual(retcode, 0) if post_child_check is not None: post_child_check() def _test_buffering( self, stream, rwpair_generator, expected_output, python_args=[] ): def check_output(stream_receiver, proc): self.assertEqual(_readall(stream_receiver, 1024), expected_output) self._test( TEST_BUFFERING_CHILD_SCRIPT.format(stream=stream), stream, rwpair_generator, check_output, python_args, ) def test_buffering_stdout_devnull(self): self._test_buffering('stdout', _devnull, None) def test_buffering_stdout_pipes(self): self._test_buffering('stdout', _pipes, FULLY_BUFFERED) def test_buffering_stdout_ptys(self): self._test_buffering('stdout', _ptys, LINE_BUFFERED) def test_buffering_stdout_devnull_unbuffered(self): self._test_buffering('stdout', _devnull, None, python_args=['-u']) def test_buffering_stdout_pipes_unbuffered(self): self._test_buffering('stdout', _pipes, UNBUFFERED, python_args=['-u']) def test_buffering_stdout_ptys_unbuffered(self): self._test_buffering('stdout', _ptys, UNBUFFERED, python_args=['-u']) if not pycompat.ispy3 and not pycompat.iswindows: # On Python 2 on non-Windows, we manually open stdout in line-buffered # mode if connected to a TTY. We should check if Python was configured # to use unbuffered stdout, but it's hard to do that. test_buffering_stdout_ptys_unbuffered = unittest.expectedFailure( test_buffering_stdout_ptys_unbuffered ) def _test_large_write(self, stream, rwpair_generator, python_args=[]): if not pycompat.ispy3 and pycompat.isdarwin: # Python 2 doesn't always retry on EINTR, but the libc might retry. # So far, it was observed only on macOS that EINTR is raised at the # Python level. As Python 2 support will be dropped soon-ish, we # won't attempt to fix it. raise unittest.SkipTest("raises EINTR on macOS") def check_output(stream_receiver, proc): if not pycompat.iswindows: # On Unix, we can provoke a partial write() by interrupting it # by a signal handler as soon as a bit of data was written. # We test that write() is called until all data is written. buf = [os.read(stream_receiver, 1)] proc.send_signal(signal.SIGINT) else: # On Windows, there doesn't seem to be a way to cause partial # writes. buf = [] self.assertEqual( _readall(stream_receiver, 131072, buf), b'x' * 1048576 ) def post_child_check(): write_result_str = write_result_f.read() if pycompat.ispy3: # On Python 3, we test that the correct number of bytes is # claimed to have been written. expected_write_result_str = '1048576' else: # On Python 2, we only check that the large write does not # crash. expected_write_result_str = 'None' self.assertEqual(write_result_str, expected_write_result_str) with tempfile.NamedTemporaryFile('r') as write_result_f: self._test( TEST_LARGE_WRITE_CHILD_SCRIPT.format( stream=stream, write_result_fn=write_result_f.name ), stream, rwpair_generator, check_output, python_args, post_child_check=post_child_check, ) def test_large_write_stdout_devnull(self): self._test_large_write('stdout', _devnull) def test_large_write_stdout_pipes(self): self._test_large_write('stdout', _pipes) def test_large_write_stdout_ptys(self): self._test_large_write('stdout', _ptys) def test_large_write_stdout_devnull_unbuffered(self): self._test_large_write('stdout', _devnull, python_args=['-u']) def test_large_write_stdout_pipes_unbuffered(self): self._test_large_write('stdout', _pipes, python_args=['-u']) def test_large_write_stdout_ptys_unbuffered(self): self._test_large_write('stdout', _ptys, python_args=['-u']) def test_large_write_stderr_devnull(self): self._test_large_write('stderr', _devnull) def test_large_write_stderr_pipes(self): self._test_large_write('stderr', _pipes) def test_large_write_stderr_ptys(self): self._test_large_write('stderr', _ptys) def test_large_write_stderr_devnull_unbuffered(self): self._test_large_write('stderr', _devnull, python_args=['-u']) def test_large_write_stderr_pipes_unbuffered(self): self._test_large_write('stderr', _pipes, python_args=['-u']) def test_large_write_stderr_ptys_unbuffered(self): self._test_large_write('stderr', _ptys, python_args=['-u']) def _test_broken_pipe(self, stream): assert stream in ('stdout', 'stderr') def check_output(stream_receiver, proc): os.close(stream_receiver) proc.stdin.write(b'x') proc.stdin.close() def post_child_check(): err = util.pickle.load(err_f) self.assertEqual(err.errno, errno.EPIPE) self.assertEqual(err.strerror, "Broken pipe") with tempfile.NamedTemporaryFile('rb') as err_f: self._test( TEST_BROKEN_PIPE_CHILD_SCRIPT.format( stream=stream, err_fn=err_f.name ), stream, _pipes, check_output, post_child_check=post_child_check, stdin_generator=util.nullcontextmanager(subprocess.PIPE), ) def test_broken_pipe_stdout(self): self._test_broken_pipe('stdout') def test_broken_pipe_stderr(self): self._test_broken_pipe('stderr') if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)