comparison tests/test-stdio.py @ 45104:eb26a9cf7821

procutil: avoid use of deprecated tempfile.mktemp() In the previous version, I used tempfile.mktemp() because it seemed to be the only way to open a file from two processes (the Python documentation says the file backing NamedTemporaryFile can’t be opened a second time on Windows). However, it’s possible when passing the O_TEMPORARY flag to the second open. Source: https://stackoverflow.com/a/15235559/6366251
author Manuel Jacob <me@manueljacob.de>
date Thu, 09 Jul 2020 12:52:04 +0200
parents dff208398ede
children c2c862b9b544
comparison
equal deleted inserted replaced
45103:a5fa2761a6cd 45104:eb26a9cf7821
32 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]' 32 LINE_BUFFERED = b'[written aaa]aaabbb\n[written bbb\\n]'
33 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n' 33 FULLY_BUFFERED = b'[written aaa][written bbb\\n]aaabbb\n'
34 34
35 35
36 TEST_LARGE_WRITE_CHILD_SCRIPT = r''' 36 TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
37 import os
37 import signal 38 import signal
38 import sys 39 import sys
39 40
40 from mercurial import dispatch 41 from mercurial import dispatch
41 from mercurial.utils import procutil 42 from mercurial.utils import procutil
42 43
43 signal.signal(signal.SIGINT, lambda *x: None) 44 signal.signal(signal.SIGINT, lambda *x: None)
44 dispatch.initstdio() 45 dispatch.initstdio()
45 write_result = procutil.{stream}.write(b'x' * 1048576) 46 write_result = procutil.{stream}.write(b'x' * 1048576)
46 with open({write_result_fn}, 'w') as write_result_f: 47 with os.fdopen(
48 os.open({write_result_fn!r}, os.O_WRONLY | getattr(os, 'O_TEMPORARY', 0)),
49 'w',
50 ) as write_result_f:
47 write_result_f.write(str(write_result)) 51 write_result_f.write(str(write_result))
48 ''' 52 '''
49 53
50 54
51 @contextlib.contextmanager 55 @contextlib.contextmanager
199 self.assertEqual( 203 self.assertEqual(
200 _readall(stream_receiver, 131072, buf), b'x' * 1048576 204 _readall(stream_receiver, 131072, buf), b'x' * 1048576
201 ) 205 )
202 206
203 def post_child_check(): 207 def post_child_check():
204 with open(write_result_fn, 'r') as write_result_f: 208 write_result_str = write_result_f.read()
205 write_result_str = write_result_f.read()
206 if pycompat.ispy3: 209 if pycompat.ispy3:
207 # On Python 3, we test that the correct number of bytes is 210 # On Python 3, we test that the correct number of bytes is
208 # claimed to have been written. 211 # claimed to have been written.
209 expected_write_result_str = '1048576' 212 expected_write_result_str = '1048576'
210 else: 213 else:
211 # On Python 2, we only check that the large write does not 214 # On Python 2, we only check that the large write does not
212 # crash. 215 # crash.
213 expected_write_result_str = 'None' 216 expected_write_result_str = 'None'
214 self.assertEqual(write_result_str, expected_write_result_str) 217 self.assertEqual(write_result_str, expected_write_result_str)
215 218
216 try: 219 with tempfile.NamedTemporaryFile('r') as write_result_f:
217 # tempfile.mktemp() is unsafe in general, as a malicious process
218 # could create the file before we do. But in tests, we're running
219 # in a controlled environment.
220 write_result_fn = tempfile.mktemp()
221 self._test( 220 self._test(
222 TEST_LARGE_WRITE_CHILD_SCRIPT.format( 221 TEST_LARGE_WRITE_CHILD_SCRIPT.format(
223 stream=stream, write_result_fn=repr(write_result_fn) 222 stream=stream, write_result_fn=write_result_f.name
224 ), 223 ),
225 stream, 224 stream,
226 rwpair_generator, 225 rwpair_generator,
227 check_output, 226 check_output,
228 python_args, 227 python_args,
229 post_child_check=post_child_check, 228 post_child_check=post_child_check,
230 ) 229 )
231 finally:
232 try:
233 os.unlink(write_result_fn)
234 except OSError:
235 pass
236 230
237 def test_large_write_stdout_devnull(self): 231 def test_large_write_stdout_devnull(self):
238 self._test_large_write('stdout', _devnull) 232 self._test_large_write('stdout', _devnull)
239 233
240 def test_large_write_stdout_pipes(self): 234 def test_large_write_stdout_pipes(self):