view mercurial/win32.py @ 52279:b267c5764cc6 stable 6.9

relnotes: write final 6.9 notes I've folded bugfixes that only exist to fix 6.9-specific things to the best of my attention span for this task.
author Raphaël Gomès <rgomes@octobus.net>
date Wed, 20 Nov 2024 14:41:23 +0100
parents f4733654f144
children
line wrap: on
line source

# win32.py - utility functions that use win32 API
#
# Copyright 2005-2009 Olivia Mackall <olivia@selenic.com> and others
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import annotations

import ctypes
import ctypes.wintypes as wintypes
import errno
import msvcrt
import os
import random
import subprocess

from typing import (
    List,
    NoReturn,
    Optional,
    Tuple,
)

from . import (
    encoding,
    pycompat,
)

# pytype: disable=module-attr
_kernel32 = ctypes.windll.kernel32
_advapi32 = ctypes.windll.advapi32
_user32 = ctypes.windll.user32
_crypt32 = ctypes.windll.crypt32
# pytype: enable=module-attr

_BOOL = ctypes.c_long
_WORD = ctypes.c_ushort
_DWORD = ctypes.c_ulong
_UINT = ctypes.c_uint
_LONG = ctypes.c_long
_LPCSTR = _LPSTR = ctypes.c_char_p
_HANDLE = ctypes.c_void_p
_HWND = _HANDLE
_PCCERT_CONTEXT = ctypes.c_void_p
_MAX_PATH = wintypes.MAX_PATH

_INVALID_HANDLE_VALUE = _HANDLE(-1).value

# GetLastError
_ERROR_SUCCESS = 0
_ERROR_NO_MORE_FILES = 18
_ERROR_INVALID_PARAMETER = 87
_ERROR_BROKEN_PIPE = 109
_ERROR_INSUFFICIENT_BUFFER = 122
_ERROR_NO_DATA = 232

# WPARAM is defined as UINT_PTR (unsigned type)
# LPARAM is defined as LONG_PTR (signed type)
if ctypes.sizeof(ctypes.c_long) == ctypes.sizeof(ctypes.c_void_p):
    _WPARAM = ctypes.c_ulong
    _LPARAM = ctypes.c_long
elif ctypes.sizeof(ctypes.c_longlong) == ctypes.sizeof(ctypes.c_void_p):
    _WPARAM = ctypes.c_ulonglong
    _LPARAM = ctypes.c_longlong


class _FILETIME(ctypes.Structure):
    _fields_ = [('dwLowDateTime', _DWORD), ('dwHighDateTime', _DWORD)]


class _BY_HANDLE_FILE_INFORMATION(ctypes.Structure):
    _fields_ = [
        ('dwFileAttributes', _DWORD),
        ('ftCreationTime', _FILETIME),
        ('ftLastAccessTime', _FILETIME),
        ('ftLastWriteTime', _FILETIME),
        ('dwVolumeSerialNumber', _DWORD),
        ('nFileSizeHigh', _DWORD),
        ('nFileSizeLow', _DWORD),
        ('nNumberOfLinks', _DWORD),
        ('nFileIndexHigh', _DWORD),
        ('nFileIndexLow', _DWORD),
    ]


# CreateFile
_FILE_SHARE_READ = 0x00000001
_FILE_SHARE_WRITE = 0x00000002
_FILE_SHARE_DELETE = 0x00000004

_OPEN_EXISTING = 3

_FILE_FLAG_BACKUP_SEMANTICS = 0x02000000

# SetFileAttributes
_FILE_ATTRIBUTE_NORMAL = 0x80
_FILE_ATTRIBUTE_NOT_CONTENT_INDEXED = 0x2000

# Process Security and Access Rights
_PROCESS_QUERY_INFORMATION = 0x0400

# GetExitCodeProcess
_STILL_ACTIVE = 259


class _STARTUPINFO(ctypes.Structure):
    _fields_ = [
        ('cb', _DWORD),
        ('lpReserved', _LPSTR),
        ('lpDesktop', _LPSTR),
        ('lpTitle', _LPSTR),
        ('dwX', _DWORD),
        ('dwY', _DWORD),
        ('dwXSize', _DWORD),
        ('dwYSize', _DWORD),
        ('dwXCountChars', _DWORD),
        ('dwYCountChars', _DWORD),
        ('dwFillAttribute', _DWORD),
        ('dwFlags', _DWORD),
        ('wShowWindow', _WORD),
        ('cbReserved2', _WORD),
        ('lpReserved2', ctypes.c_char_p),
        ('hStdInput', _HANDLE),
        ('hStdOutput', _HANDLE),
        ('hStdError', _HANDLE),
    ]


class _PROCESS_INFORMATION(ctypes.Structure):
    _fields_ = [
        ('hProcess', _HANDLE),
        ('hThread', _HANDLE),
        ('dwProcessId', _DWORD),
        ('dwThreadId', _DWORD),
    ]


_CREATE_NO_WINDOW = 0x08000000
_SW_HIDE = 0


class _COORD(ctypes.Structure):
    _fields_ = [('X', ctypes.c_short), ('Y', ctypes.c_short)]


class _SMALL_RECT(ctypes.Structure):
    _fields_ = [
        ('Left', ctypes.c_short),
        ('Top', ctypes.c_short),
        ('Right', ctypes.c_short),
        ('Bottom', ctypes.c_short),
    ]


class _CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure):
    _fields_ = [
        ('dwSize', _COORD),
        ('dwCursorPosition', _COORD),
        ('wAttributes', _WORD),
        ('srWindow', _SMALL_RECT),
        ('dwMaximumWindowSize', _COORD),
    ]


_STD_OUTPUT_HANDLE = _DWORD(-11).value
_STD_ERROR_HANDLE = _DWORD(-12).value

# CERT_TRUST_STATUS dwErrorStatus
CERT_TRUST_IS_PARTIAL_CHAIN = 0x10000

# CertCreateCertificateContext encodings
X509_ASN_ENCODING = 0x00000001
PKCS_7_ASN_ENCODING = 0x00010000


# These structs are only complete enough to achieve what we need.
class CERT_CHAIN_CONTEXT(ctypes.Structure):
    _fields_ = (
        ("cbSize", _DWORD),
        # CERT_TRUST_STATUS struct
        ("dwErrorStatus", _DWORD),
        ("dwInfoStatus", _DWORD),
        ("cChain", _DWORD),
        ("rgpChain", ctypes.c_void_p),
        ("cLowerQualityChainContext", _DWORD),
        ("rgpLowerQualityChainContext", ctypes.c_void_p),
        ("fHasRevocationFreshnessTime", _BOOL),
        ("dwRevocationFreshnessTime", _DWORD),
    )


class CERT_USAGE_MATCH(ctypes.Structure):
    _fields_ = (
        ("dwType", _DWORD),
        # CERT_ENHKEY_USAGE struct
        ("cUsageIdentifier", _DWORD),
        ("rgpszUsageIdentifier", ctypes.c_void_p),  # LPSTR *
    )


class CERT_CHAIN_PARA(ctypes.Structure):
    _fields_ = (
        ("cbSize", _DWORD),
        ("RequestedUsage", CERT_USAGE_MATCH),
        ("RequestedIssuancePolicy", CERT_USAGE_MATCH),
        ("dwUrlRetrievalTimeout", _DWORD),
        ("fCheckRevocationFreshnessTime", _BOOL),
        ("dwRevocationFreshnessTime", _DWORD),
        ("pftCacheResync", ctypes.c_void_p),  # LPFILETIME
        ("pStrongSignPara", ctypes.c_void_p),  # PCCERT_STRONG_SIGN_PARA
        ("dwStrongSignFlags", _DWORD),
    )


# types of parameters of C functions used (required by pypy)

_crypt32.CertCreateCertificateContext.argtypes = [
    _DWORD,  # cert encoding
    ctypes.c_char_p,  # cert
    _DWORD,
]  # cert size
_crypt32.CertCreateCertificateContext.restype = _PCCERT_CONTEXT

_crypt32.CertGetCertificateChain.argtypes = [
    ctypes.c_void_p,  # HCERTCHAINENGINE
    _PCCERT_CONTEXT,
    ctypes.c_void_p,  # LPFILETIME
    ctypes.c_void_p,  # HCERTSTORE
    ctypes.c_void_p,  # PCERT_CHAIN_PARA
    _DWORD,
    ctypes.c_void_p,  # LPVOID
    ctypes.c_void_p,  # PCCERT_CHAIN_CONTEXT *
]
_crypt32.CertGetCertificateChain.restype = _BOOL

_crypt32.CertFreeCertificateContext.argtypes = [_PCCERT_CONTEXT]
_crypt32.CertFreeCertificateContext.restype = _BOOL

_kernel32.CreateFileA.argtypes = [
    _LPCSTR,
    _DWORD,
    _DWORD,
    ctypes.c_void_p,
    _DWORD,
    _DWORD,
    _HANDLE,
]
_kernel32.CreateFileA.restype = _HANDLE

_kernel32.GetFileInformationByHandle.argtypes = [_HANDLE, ctypes.c_void_p]
_kernel32.GetFileInformationByHandle.restype = _BOOL

_kernel32.CloseHandle.argtypes = [_HANDLE]
_kernel32.CloseHandle.restype = _BOOL

try:
    _kernel32.CreateHardLinkA.argtypes = [_LPCSTR, _LPCSTR, ctypes.c_void_p]
    _kernel32.CreateHardLinkA.restype = _BOOL
except AttributeError:
    pass

_kernel32.SetFileAttributesA.argtypes = [_LPCSTR, _DWORD]
_kernel32.SetFileAttributesA.restype = _BOOL

_DRIVE_UNKNOWN = 0
_DRIVE_NO_ROOT_DIR = 1
_DRIVE_REMOVABLE = 2
_DRIVE_FIXED = 3
_DRIVE_REMOTE = 4
_DRIVE_CDROM = 5
_DRIVE_RAMDISK = 6

_kernel32.GetDriveTypeA.argtypes = [_LPCSTR]
_kernel32.GetDriveTypeA.restype = _UINT

_kernel32.GetVolumeInformationA.argtypes = [
    _LPCSTR,
    ctypes.c_void_p,
    _DWORD,
    ctypes.c_void_p,
    ctypes.c_void_p,
    ctypes.c_void_p,
    ctypes.c_void_p,
    _DWORD,
]
_kernel32.GetVolumeInformationA.restype = _BOOL

_kernel32.GetVolumePathNameA.argtypes = [_LPCSTR, ctypes.c_void_p, _DWORD]
_kernel32.GetVolumePathNameA.restype = _BOOL

_kernel32.OpenProcess.argtypes = [_DWORD, _BOOL, _DWORD]
_kernel32.OpenProcess.restype = _HANDLE

_kernel32.GetExitCodeProcess.argtypes = [_HANDLE, ctypes.c_void_p]
_kernel32.GetExitCodeProcess.restype = _BOOL

_kernel32.GetLastError.argtypes = []
_kernel32.GetLastError.restype = _DWORD

_kernel32.GetModuleFileNameA.argtypes = [_HANDLE, ctypes.c_void_p, _DWORD]
_kernel32.GetModuleFileNameA.restype = _DWORD

_kernel32.CreateProcessA.argtypes = [
    _LPCSTR,
    _LPCSTR,
    ctypes.c_void_p,
    ctypes.c_void_p,
    _BOOL,
    _DWORD,
    ctypes.c_void_p,
    _LPCSTR,
    ctypes.c_void_p,
    ctypes.c_void_p,
]
_kernel32.CreateProcessA.restype = _BOOL

_kernel32.ExitProcess.argtypes = [_UINT]
_kernel32.ExitProcess.restype = None

_kernel32.GetCurrentProcessId.argtypes = []
_kernel32.GetCurrentProcessId.restype = _DWORD

# pytype: disable=module-attr
_SIGNAL_HANDLER = ctypes.WINFUNCTYPE(_BOOL, _DWORD)
# pytype: enable=module-attr
_kernel32.SetConsoleCtrlHandler.argtypes = [_SIGNAL_HANDLER, _BOOL]
_kernel32.SetConsoleCtrlHandler.restype = _BOOL

_kernel32.SetConsoleMode.argtypes = [_HANDLE, _DWORD]
_kernel32.SetConsoleMode.restype = _BOOL

_kernel32.GetConsoleMode.argtypes = [_HANDLE, ctypes.c_void_p]
_kernel32.GetConsoleMode.restype = _BOOL

_kernel32.GetStdHandle.argtypes = [_DWORD]
_kernel32.GetStdHandle.restype = _HANDLE

_kernel32.GetConsoleScreenBufferInfo.argtypes = [_HANDLE, ctypes.c_void_p]
_kernel32.GetConsoleScreenBufferInfo.restype = _BOOL

_advapi32.GetUserNameA.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
_advapi32.GetUserNameA.restype = _BOOL

_user32.GetWindowThreadProcessId.argtypes = [_HANDLE, ctypes.c_void_p]
_user32.GetWindowThreadProcessId.restype = _DWORD

_user32.ShowWindow.argtypes = [_HANDLE, ctypes.c_int]
_user32.ShowWindow.restype = _BOOL

# pytype: disable=module-attr
_WNDENUMPROC = ctypes.WINFUNCTYPE(_BOOL, _HWND, _LPARAM)
# pytype: enable=module-attr
_user32.EnumWindows.argtypes = [_WNDENUMPROC, _LPARAM]
_user32.EnumWindows.restype = _BOOL

_kernel32.PeekNamedPipe.argtypes = [
    _HANDLE,
    ctypes.c_void_p,
    _DWORD,
    ctypes.c_void_p,
    ctypes.c_void_p,
    ctypes.c_void_p,
]
_kernel32.PeekNamedPipe.restype = _BOOL


def _raiseoserror(name: bytes) -> NoReturn:
    # Force the code to a signed int to avoid an 'int too large' error.
    # See https://bugs.python.org/issue28474
    code = _kernel32.GetLastError()
    if code > 0x7FFFFFFF:
        code -= 2**32
    err = ctypes.WinError(code=code)  # pytype: disable=module-attr
    raise OSError(
        err.errno, '%s: %s' % (encoding.strfromlocal(name), err.strerror)
    )


def _getfileinfo(name: bytes) -> _BY_HANDLE_FILE_INFORMATION:
    fh = _kernel32.CreateFileA(
        name,
        0,
        _FILE_SHARE_READ | _FILE_SHARE_WRITE | _FILE_SHARE_DELETE,
        None,
        _OPEN_EXISTING,
        _FILE_FLAG_BACKUP_SEMANTICS,
        None,
    )
    if fh == _INVALID_HANDLE_VALUE:
        _raiseoserror(name)
    try:
        fi = _BY_HANDLE_FILE_INFORMATION()
        if not _kernel32.GetFileInformationByHandle(fh, ctypes.byref(fi)):
            _raiseoserror(name)
        return fi
    finally:
        _kernel32.CloseHandle(fh)


def checkcertificatechain(cert: bytes, build: bool = True) -> bool:
    """Tests the given certificate to see if there is a complete chain to a
    trusted root certificate.  As a side effect, missing certificates are
    downloaded and installed unless ``build=False``.  True is returned if a
    chain to a trusted root exists (even if built on the fly), otherwise
    False.  NB: A chain to a trusted root does NOT imply that the certificate
    is valid.
    """

    chainctxptr = ctypes.POINTER(CERT_CHAIN_CONTEXT)

    pchainctx = chainctxptr()
    chainpara = CERT_CHAIN_PARA(
        cbSize=ctypes.sizeof(CERT_CHAIN_PARA), RequestedUsage=CERT_USAGE_MATCH()
    )

    certctx = _crypt32.CertCreateCertificateContext(
        X509_ASN_ENCODING, cert, len(cert)
    )
    if certctx is None:
        _raiseoserror(b'CertCreateCertificateContext')

    flags = 0

    if not build:
        flags |= 0x100  # CERT_CHAIN_DISABLE_AUTH_ROOT_AUTO_UPDATE

    try:
        # Building the certificate chain will update root certs as necessary.
        if not _crypt32.CertGetCertificateChain(
            None,  # hChainEngine
            certctx,  # pCertContext
            None,  # pTime
            None,  # hAdditionalStore
            ctypes.byref(chainpara),
            flags,
            None,  # pvReserved
            ctypes.byref(pchainctx),
        ):
            _raiseoserror(b'CertGetCertificateChain')

        chainctx = pchainctx.contents

        return chainctx.dwErrorStatus & CERT_TRUST_IS_PARTIAL_CHAIN == 0
    finally:
        if pchainctx:
            _crypt32.CertFreeCertificateChain(pchainctx)
        _crypt32.CertFreeCertificateContext(certctx)


def oslink(src: bytes, dst: bytes) -> None:
    try:
        if not _kernel32.CreateHardLinkA(dst, src, None):
            _raiseoserror(src)
    except AttributeError:  # Wine doesn't support this function
        _raiseoserror(src)


def nlinks(name: bytes) -> int:
    '''return number of hardlinks for the given file'''
    return _getfileinfo(name).nNumberOfLinks


def samefile(fpath1: bytes, fpath2: bytes) -> bool:
    '''Returns whether fpath1 and fpath2 refer to the same file or directory.'''
    res1 = _getfileinfo(fpath1)
    res2 = _getfileinfo(fpath2)
    return (
        res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber
        and res1.nFileIndexHigh == res2.nFileIndexHigh
        and res1.nFileIndexLow == res2.nFileIndexLow
    )


def samedevice(fpath1: bytes, fpath2: bytes) -> bool:
    '''Returns whether fpath1 and fpath2 are on the same device.'''
    res1 = _getfileinfo(fpath1)
    res2 = _getfileinfo(fpath2)
    return res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber


def peekpipe(pipe) -> int:
    handle = msvcrt.get_osfhandle(pipe.fileno())  # pytype: disable=module-attr
    avail = _DWORD()

    if not _kernel32.PeekNamedPipe(
        handle, None, 0, None, ctypes.byref(avail), None
    ):
        err = _kernel32.GetLastError()
        if err == _ERROR_BROKEN_PIPE:
            return 0
        raise ctypes.WinError(err)  # pytype: disable=module-attr

    return avail.value


def lasterrorwaspipeerror(err) -> bool:
    if err.errno != errno.EINVAL:
        return False
    err = _kernel32.GetLastError()
    return err == _ERROR_BROKEN_PIPE or err == _ERROR_NO_DATA


def testpid(pid: int) -> bool:
    """return True if pid is still running or unable to
    determine, False otherwise"""
    h = _kernel32.OpenProcess(_PROCESS_QUERY_INFORMATION, False, pid)
    if h:
        try:
            status = _DWORD()
            if _kernel32.GetExitCodeProcess(h, ctypes.byref(status)):
                return status.value == _STILL_ACTIVE
        finally:
            _kernel32.CloseHandle(h)
    return _kernel32.GetLastError() != _ERROR_INVALID_PARAMETER


def executablepath() -> bytes:
    '''return full path of hg.exe'''
    size = 600
    buf = ctypes.create_string_buffer(size + 1)
    len = _kernel32.GetModuleFileNameA(None, ctypes.byref(buf), size)
    # pytype: disable=module-attr
    if len == 0:
        raise ctypes.WinError()  # Note: WinError is a function
    elif len == size:
        raise ctypes.WinError(_ERROR_INSUFFICIENT_BUFFER)
    # pytype: enable=module-attr
    return buf.value


def getvolumename(path: bytes) -> Optional[bytes]:
    """Get the mount point of the filesystem from a directory or file
    (best-effort)

    Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc.
    """
    # realpath() calls GetFullPathName()
    realpath = os.path.realpath(path)

    # allocate at least MAX_PATH long since GetVolumePathName('c:\\', buf, 4)
    # somehow fails on Windows XP
    size = max(len(realpath), _MAX_PATH) + 1
    buf = ctypes.create_string_buffer(size)

    if not _kernel32.GetVolumePathNameA(realpath, ctypes.byref(buf), size):
        # Note: WinError is a function
        raise ctypes.WinError()  # pytype: disable=module-attr

    return buf.value


def getfstype(path: bytes) -> Optional[bytes]:
    """Get the filesystem type name from a directory or file (best-effort)

    Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc.
    """
    volume = getvolumename(path)

    t = _kernel32.GetDriveTypeA(volume)

    if t == _DRIVE_REMOTE:
        return b'cifs'
    elif t not in (
        _DRIVE_REMOVABLE,
        _DRIVE_FIXED,
        _DRIVE_CDROM,
        _DRIVE_RAMDISK,
    ):
        return None

    size = _MAX_PATH + 1
    name = ctypes.create_string_buffer(size)

    if not _kernel32.GetVolumeInformationA(
        volume, None, 0, None, None, None, ctypes.byref(name), size
    ):
        # Note: WinError is a function
        raise ctypes.WinError()  # pytype: disable=module-attr

    return name.value


def getuser() -> bytes:
    '''return name of current user'''
    size = _DWORD(300)
    buf = ctypes.create_string_buffer(size.value + 1)
    if not _advapi32.GetUserNameA(ctypes.byref(buf), ctypes.byref(size)):
        raise ctypes.WinError()  # pytype: disable=module-attr
    return buf.value


_signalhandler: List[_SIGNAL_HANDLER] = []


def setsignalhandler() -> None:
    """Register a termination handler for console events including
    CTRL+C. python signal handlers do not work well with socket
    operations.
    """

    def handler(event):
        _kernel32.ExitProcess(1)

    if _signalhandler:
        return  # already registered
    h = _SIGNAL_HANDLER(handler)
    _signalhandler.append(h)  # needed to prevent garbage collection
    if not _kernel32.SetConsoleCtrlHandler(h, True):
        raise ctypes.WinError()  # pytype: disable=module-attr


def hidewindow() -> None:
    def callback(hwnd, pid):
        wpid = _DWORD()
        _user32.GetWindowThreadProcessId(hwnd, ctypes.byref(wpid))
        if pid == wpid.value:
            _user32.ShowWindow(hwnd, _SW_HIDE)
            return False  # stop enumerating windows
        return True

    pid = _kernel32.GetCurrentProcessId()
    _user32.EnumWindows(_WNDENUMPROC(callback), pid)


def termsize() -> Tuple[int, int]:
    # cmd.exe does not handle CR like a unix console, the CR is
    # counted in the line length. On 80 columns consoles, if 80
    # characters are written, the following CR won't apply on the
    # current line but on the new one. Keep room for it.
    width = 80 - 1
    height = 25
    # Query stderr to avoid problems with redirections
    screenbuf = _kernel32.GetStdHandle(
        _STD_ERROR_HANDLE
    )  # don't close the handle returned
    if screenbuf is None or screenbuf == _INVALID_HANDLE_VALUE:
        return width, height
    csbi = _CONSOLE_SCREEN_BUFFER_INFO()
    if not _kernel32.GetConsoleScreenBufferInfo(screenbuf, ctypes.byref(csbi)):
        return width, height
    width = csbi.srWindow.Right - csbi.srWindow.Left  # don't '+ 1'
    height = csbi.srWindow.Bottom - csbi.srWindow.Top + 1
    return width, height


def enablevtmode() -> bool:
    """Enable virtual terminal mode for the associated console.  Return True if
    enabled, else False."""

    ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x4

    handle = _kernel32.GetStdHandle(
        _STD_OUTPUT_HANDLE
    )  # don't close the handle
    if handle == _INVALID_HANDLE_VALUE:
        return False

    mode = _DWORD(0)

    if not _kernel32.GetConsoleMode(handle, ctypes.byref(mode)):
        return False

    if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0:
        mode.value |= ENABLE_VIRTUAL_TERMINAL_PROCESSING

        if not _kernel32.SetConsoleMode(handle, mode):
            return False

    return True


def spawndetached(args: List[bytes]) -> int:
    # No standard library function really spawns a fully detached
    # process under win32 because they allocate pipes or other objects
    # to handle standard streams communications. Passing these objects
    # to the child process requires handle inheritance to be enabled
    # which makes really detached processes impossible.
    si = _STARTUPINFO()
    si.cb = ctypes.sizeof(_STARTUPINFO)

    pi = _PROCESS_INFORMATION()

    env = b''
    for k in encoding.environ:
        env += b"%s=%s\0" % (k, encoding.environ[k])
    if not env:
        env = b'\0'
    env += b'\0'

    args = subprocess.list2cmdline(pycompat.rapply(encoding.strfromlocal, args))

    # TODO: CreateProcessW on py3?
    res = _kernel32.CreateProcessA(
        None,
        encoding.strtolocal(args),
        None,
        None,
        False,
        _CREATE_NO_WINDOW,
        env,
        encoding.getcwd(),
        ctypes.byref(si),
        ctypes.byref(pi),
    )
    if not res:
        raise ctypes.WinError()  # pytype: disable=module-attr

    _kernel32.CloseHandle(pi.hProcess)
    _kernel32.CloseHandle(pi.hThread)

    return pi.dwProcessId


def unlink(path: bytes) -> None:
    '''try to implement POSIX' unlink semantics on Windows'''

    if os.path.isdir(path):
        # use EPERM because it is POSIX prescribed value, even though
        # unlink(2) on directories returns EISDIR on Linux
        raise IOError(
            errno.EPERM,
            r"Unlinking directory not permitted: '%s'"
            % encoding.strfromlocal(path),
        )

    # POSIX allows to unlink and rename open files. Windows has serious
    # problems with doing that:
    # - Calling os.unlink (or os.rename) on a file f fails if f or any
    #   hardlinked copy of f has been opened with Python's open(). There is no
    #   way such a file can be deleted or renamed on Windows (other than
    #   scheduling the delete or rename for the next reboot).
    # - Calling os.unlink on a file that has been opened with Mercurial's
    #   posixfile (or comparable methods) will delay the actual deletion of
    #   the file for as long as the file is held open. The filename is blocked
    #   during that time and cannot be used for recreating a new file under
    #   that same name ("zombie file"). Directories containing such zombie files
    #   cannot be removed or moved.
    # A file that has been opened with posixfile can be renamed, so we rename
    # f to a random temporary name before calling os.unlink on it. This allows
    # callers to recreate f immediately while having other readers do their
    # implicit zombie filename blocking on a temporary name.

    for tries in range(10):
        temp = b'%s-%08x' % (path, random.randint(0, 0xFFFFFFFF))
        try:
            os.rename(path, temp)
            break
        except FileExistsError:
            pass
    else:
        raise IOError(errno.EEXIST, "No usable temporary filename found")

    try:
        os.unlink(temp)
    except OSError:
        # The unlink might have failed because the READONLY attribute may heave
        # been set on the original file. Rename works fine with READONLY set,
        # but not os.unlink. Reset all attributes and try again.
        _kernel32.SetFileAttributesA(temp, _FILE_ATTRIBUTE_NORMAL)
        try:
            os.unlink(temp)
        except OSError:
            # The unlink might have failed due to some very rude AV-Scanners.
            # Leaking a tempfile is the lesser evil than aborting here and
            # leaving some potentially serious inconsistencies.
            pass


def makedir(path: bytes, notindexed: bool) -> None:
    os.mkdir(path)
    if notindexed:
        _kernel32.SetFileAttributesA(path, _FILE_ATTRIBUTE_NOT_CONTENT_INDEXED)