Mercurial > hg-stable
comparison mercurial/win32.py @ 49905:a9faacdc5943
typing: add type hints to mercurial/win32.py
These are the low level functions that are imported by the mercurial.windows
module, which is in turn imported by mercurial.utils as the platform module.
Pretty straightforward, but pytype inferred very little of it, likely because of
the heavy ctypes usage. It also seems to trigger a pytype bug in procutil, now
that it has an idea of the underlying function type, so disable that warning to
maintain a working test.
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Thu, 15 Dec 2022 18:02:55 -0500 |
parents | 53e9422a9b45 |
children | 493034cc3265 |
comparison
equal
deleted
inserted
replaced
49904:7a80a614c9e5 | 49905:a9faacdc5943 |
---|---|
11 import errno | 11 import errno |
12 import msvcrt | 12 import msvcrt |
13 import os | 13 import os |
14 import random | 14 import random |
15 import subprocess | 15 import subprocess |
16 | |
17 from typing import ( | |
18 List, | |
19 NoReturn, | |
20 Optional, | |
21 Tuple, | |
22 ) | |
16 | 23 |
17 from . import ( | 24 from . import ( |
18 encoding, | 25 encoding, |
19 pycompat, | 26 pycompat, |
20 ) | 27 ) |
354 ctypes.c_void_p, | 361 ctypes.c_void_p, |
355 ] | 362 ] |
356 _kernel32.PeekNamedPipe.restype = _BOOL | 363 _kernel32.PeekNamedPipe.restype = _BOOL |
357 | 364 |
358 | 365 |
359 def _raiseoserror(name): | 366 def _raiseoserror(name: bytes) -> NoReturn: |
360 # Force the code to a signed int to avoid an 'int too large' error. | 367 # Force the code to a signed int to avoid an 'int too large' error. |
361 # See https://bugs.python.org/issue28474 | 368 # See https://bugs.python.org/issue28474 |
362 code = _kernel32.GetLastError() | 369 code = _kernel32.GetLastError() |
363 if code > 0x7FFFFFFF: | 370 if code > 0x7FFFFFFF: |
364 code -= 2 ** 32 | 371 code -= 2 ** 32 |
366 raise OSError( | 373 raise OSError( |
367 err.errno, '%s: %s' % (encoding.strfromlocal(name), err.strerror) | 374 err.errno, '%s: %s' % (encoding.strfromlocal(name), err.strerror) |
368 ) | 375 ) |
369 | 376 |
370 | 377 |
371 def _getfileinfo(name): | 378 def _getfileinfo(name: bytes) -> _BY_HANDLE_FILE_INFORMATION: |
372 fh = _kernel32.CreateFileA( | 379 fh = _kernel32.CreateFileA( |
373 name, | 380 name, |
374 0, | 381 0, |
375 _FILE_SHARE_READ | _FILE_SHARE_WRITE | _FILE_SHARE_DELETE, | 382 _FILE_SHARE_READ | _FILE_SHARE_WRITE | _FILE_SHARE_DELETE, |
376 None, | 383 None, |
387 return fi | 394 return fi |
388 finally: | 395 finally: |
389 _kernel32.CloseHandle(fh) | 396 _kernel32.CloseHandle(fh) |
390 | 397 |
391 | 398 |
392 def checkcertificatechain(cert, build=True): | 399 def checkcertificatechain(cert: bytes, build: bool = True) -> bool: |
393 """Tests the given certificate to see if there is a complete chain to a | 400 """Tests the given certificate to see if there is a complete chain to a |
394 trusted root certificate. As a side effect, missing certificates are | 401 trusted root certificate. As a side effect, missing certificates are |
395 downloaded and installed unless ``build=False``. True is returned if a | 402 downloaded and installed unless ``build=False``. True is returned if a |
396 chain to a trusted root exists (even if built on the fly), otherwise | 403 chain to a trusted root exists (even if built on the fly), otherwise |
397 False. NB: A chain to a trusted root does NOT imply that the certificate | 404 False. NB: A chain to a trusted root does NOT imply that the certificate |
437 if pchainctx: | 444 if pchainctx: |
438 _crypt32.CertFreeCertificateChain(pchainctx) | 445 _crypt32.CertFreeCertificateChain(pchainctx) |
439 _crypt32.CertFreeCertificateContext(certctx) | 446 _crypt32.CertFreeCertificateContext(certctx) |
440 | 447 |
441 | 448 |
442 def oslink(src, dst): | 449 def oslink(src: bytes, dst: bytes) -> None: |
443 try: | 450 try: |
444 if not _kernel32.CreateHardLinkA(dst, src, None): | 451 if not _kernel32.CreateHardLinkA(dst, src, None): |
445 _raiseoserror(src) | 452 _raiseoserror(src) |
446 except AttributeError: # Wine doesn't support this function | 453 except AttributeError: # Wine doesn't support this function |
447 _raiseoserror(src) | 454 _raiseoserror(src) |
448 | 455 |
449 | 456 |
450 def nlinks(name): | 457 def nlinks(name: bytes) -> int: |
451 '''return number of hardlinks for the given file''' | 458 '''return number of hardlinks for the given file''' |
452 return _getfileinfo(name).nNumberOfLinks | 459 return _getfileinfo(name).nNumberOfLinks |
453 | 460 |
454 | 461 |
455 def samefile(path1, path2): | 462 def samefile(path1: bytes, path2: bytes) -> bool: |
456 '''Returns whether path1 and path2 refer to the same file or directory.''' | 463 '''Returns whether path1 and path2 refer to the same file or directory.''' |
457 res1 = _getfileinfo(path1) | 464 res1 = _getfileinfo(path1) |
458 res2 = _getfileinfo(path2) | 465 res2 = _getfileinfo(path2) |
459 return ( | 466 return ( |
460 res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber | 467 res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber |
461 and res1.nFileIndexHigh == res2.nFileIndexHigh | 468 and res1.nFileIndexHigh == res2.nFileIndexHigh |
462 and res1.nFileIndexLow == res2.nFileIndexLow | 469 and res1.nFileIndexLow == res2.nFileIndexLow |
463 ) | 470 ) |
464 | 471 |
465 | 472 |
466 def samedevice(path1, path2): | 473 def samedevice(path1: bytes, path2: bytes) -> bool: |
467 '''Returns whether path1 and path2 are on the same device.''' | 474 '''Returns whether path1 and path2 are on the same device.''' |
468 res1 = _getfileinfo(path1) | 475 res1 = _getfileinfo(path1) |
469 res2 = _getfileinfo(path2) | 476 res2 = _getfileinfo(path2) |
470 return res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber | 477 return res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber |
471 | 478 |
472 | 479 |
473 def peekpipe(pipe): | 480 def peekpipe(pipe) -> int: |
474 handle = msvcrt.get_osfhandle(pipe.fileno()) # pytype: disable=module-attr | 481 handle = msvcrt.get_osfhandle(pipe.fileno()) # pytype: disable=module-attr |
475 avail = _DWORD() | 482 avail = _DWORD() |
476 | 483 |
477 if not _kernel32.PeekNamedPipe( | 484 if not _kernel32.PeekNamedPipe( |
478 handle, None, 0, None, ctypes.byref(avail), None | 485 handle, None, 0, None, ctypes.byref(avail), None |
483 raise ctypes.WinError(err) # pytype: disable=module-attr | 490 raise ctypes.WinError(err) # pytype: disable=module-attr |
484 | 491 |
485 return avail.value | 492 return avail.value |
486 | 493 |
487 | 494 |
488 def lasterrorwaspipeerror(err): | 495 def lasterrorwaspipeerror(err) -> bool: |
489 if err.errno != errno.EINVAL: | 496 if err.errno != errno.EINVAL: |
490 return False | 497 return False |
491 err = _kernel32.GetLastError() | 498 err = _kernel32.GetLastError() |
492 return err == _ERROR_BROKEN_PIPE or err == _ERROR_NO_DATA | 499 return err == _ERROR_BROKEN_PIPE or err == _ERROR_NO_DATA |
493 | 500 |
494 | 501 |
495 def testpid(pid): | 502 def testpid(pid: int) -> bool: |
496 """return True if pid is still running or unable to | 503 """return True if pid is still running or unable to |
497 determine, False otherwise""" | 504 determine, False otherwise""" |
498 h = _kernel32.OpenProcess(_PROCESS_QUERY_INFORMATION, False, pid) | 505 h = _kernel32.OpenProcess(_PROCESS_QUERY_INFORMATION, False, pid) |
499 if h: | 506 if h: |
500 try: | 507 try: |
504 finally: | 511 finally: |
505 _kernel32.CloseHandle(h) | 512 _kernel32.CloseHandle(h) |
506 return _kernel32.GetLastError() != _ERROR_INVALID_PARAMETER | 513 return _kernel32.GetLastError() != _ERROR_INVALID_PARAMETER |
507 | 514 |
508 | 515 |
509 def executablepath(): | 516 def executablepath() -> bytes: |
510 '''return full path of hg.exe''' | 517 '''return full path of hg.exe''' |
511 size = 600 | 518 size = 600 |
512 buf = ctypes.create_string_buffer(size + 1) | 519 buf = ctypes.create_string_buffer(size + 1) |
513 len = _kernel32.GetModuleFileNameA(None, ctypes.byref(buf), size) | 520 len = _kernel32.GetModuleFileNameA(None, ctypes.byref(buf), size) |
514 # pytype: disable=module-attr | 521 # pytype: disable=module-attr |
518 raise ctypes.WinError(_ERROR_INSUFFICIENT_BUFFER) | 525 raise ctypes.WinError(_ERROR_INSUFFICIENT_BUFFER) |
519 # pytype: enable=module-attr | 526 # pytype: enable=module-attr |
520 return buf.value | 527 return buf.value |
521 | 528 |
522 | 529 |
523 def getvolumename(path): | 530 def getvolumename(path: bytes) -> Optional[bytes]: |
524 """Get the mount point of the filesystem from a directory or file | 531 """Get the mount point of the filesystem from a directory or file |
525 (best-effort) | 532 (best-effort) |
526 | 533 |
527 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc. | 534 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc. |
528 """ | 535 """ |
539 raise ctypes.WinError() # pytype: disable=module-attr | 546 raise ctypes.WinError() # pytype: disable=module-attr |
540 | 547 |
541 return buf.value | 548 return buf.value |
542 | 549 |
543 | 550 |
544 def getfstype(path): | 551 def getfstype(path: bytes) -> Optional[bytes]: |
545 """Get the filesystem type name from a directory or file (best-effort) | 552 """Get the filesystem type name from a directory or file (best-effort) |
546 | 553 |
547 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc. | 554 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc. |
548 """ | 555 """ |
549 volume = getvolumename(path) | 556 volume = getvolumename(path) |
570 raise ctypes.WinError() # pytype: disable=module-attr | 577 raise ctypes.WinError() # pytype: disable=module-attr |
571 | 578 |
572 return name.value | 579 return name.value |
573 | 580 |
574 | 581 |
575 def getuser(): | 582 def getuser() -> bytes: |
576 '''return name of current user''' | 583 '''return name of current user''' |
577 size = _DWORD(300) | 584 size = _DWORD(300) |
578 buf = ctypes.create_string_buffer(size.value + 1) | 585 buf = ctypes.create_string_buffer(size.value + 1) |
579 if not _advapi32.GetUserNameA(ctypes.byref(buf), ctypes.byref(size)): | 586 if not _advapi32.GetUserNameA(ctypes.byref(buf), ctypes.byref(size)): |
580 raise ctypes.WinError() # pytype: disable=module-attr | 587 raise ctypes.WinError() # pytype: disable=module-attr |
581 return buf.value | 588 return buf.value |
582 | 589 |
583 | 590 |
584 _signalhandler = [] | 591 _signalhandler: List[_SIGNAL_HANDLER] = [] |
585 | 592 |
586 | 593 |
587 def setsignalhandler(): | 594 def setsignalhandler() -> None: |
588 """Register a termination handler for console events including | 595 """Register a termination handler for console events including |
589 CTRL+C. python signal handlers do not work well with socket | 596 CTRL+C. python signal handlers do not work well with socket |
590 operations. | 597 operations. |
591 """ | 598 """ |
592 | 599 |
599 _signalhandler.append(h) # needed to prevent garbage collection | 606 _signalhandler.append(h) # needed to prevent garbage collection |
600 if not _kernel32.SetConsoleCtrlHandler(h, True): | 607 if not _kernel32.SetConsoleCtrlHandler(h, True): |
601 raise ctypes.WinError() # pytype: disable=module-attr | 608 raise ctypes.WinError() # pytype: disable=module-attr |
602 | 609 |
603 | 610 |
604 def hidewindow(): | 611 def hidewindow() -> None: |
605 def callback(hwnd, pid): | 612 def callback(hwnd, pid): |
606 wpid = _DWORD() | 613 wpid = _DWORD() |
607 _user32.GetWindowThreadProcessId(hwnd, ctypes.byref(wpid)) | 614 _user32.GetWindowThreadProcessId(hwnd, ctypes.byref(wpid)) |
608 if pid == wpid.value: | 615 if pid == wpid.value: |
609 _user32.ShowWindow(hwnd, _SW_HIDE) | 616 _user32.ShowWindow(hwnd, _SW_HIDE) |
612 | 619 |
613 pid = _kernel32.GetCurrentProcessId() | 620 pid = _kernel32.GetCurrentProcessId() |
614 _user32.EnumWindows(_WNDENUMPROC(callback), pid) | 621 _user32.EnumWindows(_WNDENUMPROC(callback), pid) |
615 | 622 |
616 | 623 |
617 def termsize(): | 624 def termsize() -> Tuple[int, int]: |
618 # cmd.exe does not handle CR like a unix console, the CR is | 625 # cmd.exe does not handle CR like a unix console, the CR is |
619 # counted in the line length. On 80 columns consoles, if 80 | 626 # counted in the line length. On 80 columns consoles, if 80 |
620 # characters are written, the following CR won't apply on the | 627 # characters are written, the following CR won't apply on the |
621 # current line but on the new one. Keep room for it. | 628 # current line but on the new one. Keep room for it. |
622 width = 80 - 1 | 629 width = 80 - 1 |
633 width = csbi.srWindow.Right - csbi.srWindow.Left # don't '+ 1' | 640 width = csbi.srWindow.Right - csbi.srWindow.Left # don't '+ 1' |
634 height = csbi.srWindow.Bottom - csbi.srWindow.Top + 1 | 641 height = csbi.srWindow.Bottom - csbi.srWindow.Top + 1 |
635 return width, height | 642 return width, height |
636 | 643 |
637 | 644 |
638 def enablevtmode(): | 645 def enablevtmode() -> bool: |
639 """Enable virtual terminal mode for the associated console. Return True if | 646 """Enable virtual terminal mode for the associated console. Return True if |
640 enabled, else False.""" | 647 enabled, else False.""" |
641 | 648 |
642 ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x4 | 649 ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x4 |
643 | 650 |
659 return False | 666 return False |
660 | 667 |
661 return True | 668 return True |
662 | 669 |
663 | 670 |
664 def spawndetached(args): | 671 def spawndetached(args: List[bytes]) -> int: |
665 # No standard library function really spawns a fully detached | 672 # No standard library function really spawns a fully detached |
666 # process under win32 because they allocate pipes or other objects | 673 # process under win32 because they allocate pipes or other objects |
667 # to handle standard streams communications. Passing these objects | 674 # to handle standard streams communications. Passing these objects |
668 # to the child process requires handle inheritance to be enabled | 675 # to the child process requires handle inheritance to be enabled |
669 # which makes really detached processes impossible. | 676 # which makes really detached processes impossible. |
701 _kernel32.CloseHandle(pi.hThread) | 708 _kernel32.CloseHandle(pi.hThread) |
702 | 709 |
703 return pi.dwProcessId | 710 return pi.dwProcessId |
704 | 711 |
705 | 712 |
706 def unlink(f): | 713 def unlink(f: bytes) -> None: |
707 '''try to implement POSIX' unlink semantics on Windows''' | 714 '''try to implement POSIX' unlink semantics on Windows''' |
708 | 715 |
709 if os.path.isdir(f): | 716 if os.path.isdir(f): |
710 # use EPERM because it is POSIX prescribed value, even though | 717 # use EPERM because it is POSIX prescribed value, even though |
711 # unlink(2) on directories returns EISDIR on Linux | 718 # unlink(2) on directories returns EISDIR on Linux |
756 # Leaking a tempfile is the lesser evil than aborting here and | 763 # Leaking a tempfile is the lesser evil than aborting here and |
757 # leaving some potentially serious inconsistencies. | 764 # leaving some potentially serious inconsistencies. |
758 pass | 765 pass |
759 | 766 |
760 | 767 |
761 def makedir(path, notindexed): | 768 def makedir(path: bytes, notindexed: bool) -> None: |
762 os.mkdir(path) | 769 os.mkdir(path) |
763 if notindexed: | 770 if notindexed: |
764 _kernel32.SetFileAttributesA(path, _FILE_ATTRIBUTE_NOT_CONTENT_INDEXED) | 771 _kernel32.SetFileAttributesA(path, _FILE_ATTRIBUTE_NOT_CONTENT_INDEXED) |