--- a/mercurial/win32.py Mon Jul 10 19:40:23 2017 +0200
+++ b/mercurial/win32.py Wed Mar 29 23:45:23 2017 -0400
@@ -22,6 +22,7 @@
_kernel32 = ctypes.windll.kernel32
_advapi32 = ctypes.windll.advapi32
_user32 = ctypes.windll.user32
+_crypt32 = ctypes.windll.crypt32
_BOOL = ctypes.c_long
_WORD = ctypes.c_ushort
@@ -31,6 +32,7 @@
_LPCSTR = _LPSTR = ctypes.c_char_p
_HANDLE = ctypes.c_void_p
_HWND = _HANDLE
+_PCCERT_CONTEXT = ctypes.c_void_p
_INVALID_HANDLE_VALUE = _HANDLE(-1).value
@@ -134,8 +136,74 @@
_STD_OUTPUT_HANDLE = _DWORD(-11).value
_STD_ERROR_HANDLE = _DWORD(-12).value
+# CERT_TRUST_STATUS dwErrorStatus
+CERT_TRUST_IS_PARTIAL_CHAIN = 0x10000
+
+# CertCreateCertificateContext encodings
+X509_ASN_ENCODING = 0x00000001
+PKCS_7_ASN_ENCODING = 0x00010000
+
+# These structs are only complete enough to achieve what we need.
+class CERT_CHAIN_CONTEXT(ctypes.Structure):
+ _fields_ = (
+ ("cbSize", _DWORD),
+
+ # CERT_TRUST_STATUS struct
+ ("dwErrorStatus", _DWORD),
+ ("dwInfoStatus", _DWORD),
+
+ ("cChain", _DWORD),
+ ("rgpChain", ctypes.c_void_p),
+ ("cLowerQualityChainContext", _DWORD),
+ ("rgpLowerQualityChainContext", ctypes.c_void_p),
+ ("fHasRevocationFreshnessTime", _BOOL),
+ ("dwRevocationFreshnessTime", _DWORD),
+ )
+
+class CERT_USAGE_MATCH(ctypes.Structure):
+ _fields_ = (
+ ("dwType", _DWORD),
+
+ # CERT_ENHKEY_USAGE struct
+ ("cUsageIdentifier", _DWORD),
+ ("rgpszUsageIdentifier", ctypes.c_void_p), # LPSTR *
+ )
+
+class CERT_CHAIN_PARA(ctypes.Structure):
+ _fields_ = (
+ ("cbSize", _DWORD),
+ ("RequestedUsage", CERT_USAGE_MATCH),
+ ("RequestedIssuancePolicy", CERT_USAGE_MATCH),
+ ("dwUrlRetrievalTimeout", _DWORD),
+ ("fCheckRevocationFreshnessTime", _BOOL),
+ ("dwRevocationFreshnessTime", _DWORD),
+ ("pftCacheResync", ctypes.c_void_p), # LPFILETIME
+ ("pStrongSignPara", ctypes.c_void_p), # PCCERT_STRONG_SIGN_PARA
+ ("dwStrongSignFlags", _DWORD),
+ )
+
# types of parameters of C functions used (required by pypy)
+_crypt32.CertCreateCertificateContext.argtypes = [_DWORD, # cert encoding
+ ctypes.c_char_p, # cert
+ _DWORD] # cert size
+_crypt32.CertCreateCertificateContext.restype = _PCCERT_CONTEXT
+
+_crypt32.CertGetCertificateChain.argtypes = [
+ ctypes.c_void_p, # HCERTCHAINENGINE
+ _PCCERT_CONTEXT,
+ ctypes.c_void_p, # LPFILETIME
+ ctypes.c_void_p, # HCERTSTORE
+ ctypes.c_void_p, # PCERT_CHAIN_PARA
+ _DWORD,
+ ctypes.c_void_p, # LPVOID
+ ctypes.c_void_p # PCCERT_CHAIN_CONTEXT *
+ ]
+_crypt32.CertGetCertificateChain.restype = _BOOL
+
+_crypt32.CertFreeCertificateContext.argtypes = [_PCCERT_CONTEXT]
+_crypt32.CertFreeCertificateContext.restype = _BOOL
+
_kernel32.CreateFileA.argtypes = [_LPCSTR, _DWORD, _DWORD, ctypes.c_void_p,
_DWORD, _DWORD, _HANDLE]
_kernel32.CreateFileA.restype = _HANDLE
@@ -234,6 +302,51 @@
finally:
_kernel32.CloseHandle(fh)
+def checkcertificatechain(cert, build=True):
+ '''Tests the given certificate to see if there is a complete chain to a
+ trusted root certificate. As a side effect, missing certificates are
+ downloaded and installed unless ``build=False``. True is returned if a
+ chain to a trusted root exists (even if built on the fly), otherwise
+ False. NB: A chain to a trusted root does NOT imply that the certificate
+ is valid.
+ '''
+
+ chainctxptr = ctypes.POINTER(CERT_CHAIN_CONTEXT)
+
+ pchainctx = chainctxptr()
+ chainpara = CERT_CHAIN_PARA(cbSize=ctypes.sizeof(CERT_CHAIN_PARA),
+ RequestedUsage=CERT_USAGE_MATCH())
+
+ certctx = _crypt32.CertCreateCertificateContext(X509_ASN_ENCODING, cert,
+ len(cert))
+ if certctx is None:
+ _raiseoserror('CertCreateCertificateContext')
+
+ flags = 0
+
+ if not build:
+ flags |= 0x100 # CERT_CHAIN_DISABLE_AUTH_ROOT_AUTO_UPDATE
+
+ try:
+ # Building the certificate chain will update root certs as necessary.
+ if not _crypt32.CertGetCertificateChain(None, # hChainEngine
+ certctx, # pCertContext
+ None, # pTime
+ None, # hAdditionalStore
+ ctypes.byref(chainpara),
+ flags,
+ None, # pvReserved
+ ctypes.byref(pchainctx)):
+ _raiseoserror('CertGetCertificateChain')
+
+ chainctx = pchainctx.contents
+
+ return chainctx.dwErrorStatus & CERT_TRUST_IS_PARTIAL_CHAIN == 0
+ finally:
+ if pchainctx:
+ _crypt32.CertFreeCertificateChain(pchainctx)
+ _crypt32.CertFreeCertificateContext(certctx)
+
def oslink(src, dst):
try:
if not _kernel32.CreateHardLinkA(dst, src, None):