mercurial/win32.py
changeset 33492 14af04391fb9
parent 33419 7c33adc823e0
child 34040 d5b2beca16c0
--- a/mercurial/win32.py	Mon Jul 10 19:40:23 2017 +0200
+++ b/mercurial/win32.py	Wed Mar 29 23:45:23 2017 -0400
@@ -22,6 +22,7 @@
 _kernel32 = ctypes.windll.kernel32
 _advapi32 = ctypes.windll.advapi32
 _user32 = ctypes.windll.user32
+_crypt32 = ctypes.windll.crypt32
 
 _BOOL = ctypes.c_long
 _WORD = ctypes.c_ushort
@@ -31,6 +32,7 @@
 _LPCSTR = _LPSTR = ctypes.c_char_p
 _HANDLE = ctypes.c_void_p
 _HWND = _HANDLE
+_PCCERT_CONTEXT = ctypes.c_void_p
 
 _INVALID_HANDLE_VALUE = _HANDLE(-1).value
 
@@ -134,8 +136,74 @@
 _STD_OUTPUT_HANDLE = _DWORD(-11).value
 _STD_ERROR_HANDLE = _DWORD(-12).value
 
+# CERT_TRUST_STATUS dwErrorStatus
+CERT_TRUST_IS_PARTIAL_CHAIN = 0x10000
+
+# CertCreateCertificateContext encodings
+X509_ASN_ENCODING = 0x00000001
+PKCS_7_ASN_ENCODING = 0x00010000
+
+# These structs are only complete enough to achieve what we need.
+class CERT_CHAIN_CONTEXT(ctypes.Structure):
+    _fields_ = (
+        ("cbSize", _DWORD),
+
+        # CERT_TRUST_STATUS struct
+        ("dwErrorStatus", _DWORD),
+        ("dwInfoStatus", _DWORD),
+
+        ("cChain", _DWORD),
+        ("rgpChain", ctypes.c_void_p),
+        ("cLowerQualityChainContext", _DWORD),
+        ("rgpLowerQualityChainContext", ctypes.c_void_p),
+        ("fHasRevocationFreshnessTime", _BOOL),
+        ("dwRevocationFreshnessTime", _DWORD),
+    )
+
+class CERT_USAGE_MATCH(ctypes.Structure):
+    _fields_ = (
+        ("dwType", _DWORD),
+
+         # CERT_ENHKEY_USAGE struct
+        ("cUsageIdentifier", _DWORD),
+        ("rgpszUsageIdentifier", ctypes.c_void_p), # LPSTR *
+    )
+
+class CERT_CHAIN_PARA(ctypes.Structure):
+    _fields_ = (
+        ("cbSize", _DWORD),
+        ("RequestedUsage", CERT_USAGE_MATCH),
+        ("RequestedIssuancePolicy", CERT_USAGE_MATCH),
+        ("dwUrlRetrievalTimeout", _DWORD),
+        ("fCheckRevocationFreshnessTime", _BOOL),
+        ("dwRevocationFreshnessTime", _DWORD),
+        ("pftCacheResync", ctypes.c_void_p), # LPFILETIME
+        ("pStrongSignPara", ctypes.c_void_p), # PCCERT_STRONG_SIGN_PARA
+        ("dwStrongSignFlags", _DWORD),
+    )
+
 # types of parameters of C functions used (required by pypy)
 
+_crypt32.CertCreateCertificateContext.argtypes = [_DWORD, # cert encoding
+                                                  ctypes.c_char_p, # cert
+                                                  _DWORD] # cert size
+_crypt32.CertCreateCertificateContext.restype = _PCCERT_CONTEXT
+
+_crypt32.CertGetCertificateChain.argtypes = [
+        ctypes.c_void_p, # HCERTCHAINENGINE
+        _PCCERT_CONTEXT,
+        ctypes.c_void_p, # LPFILETIME
+        ctypes.c_void_p, # HCERTSTORE
+        ctypes.c_void_p, # PCERT_CHAIN_PARA
+        _DWORD,
+        ctypes.c_void_p, # LPVOID
+        ctypes.c_void_p  # PCCERT_CHAIN_CONTEXT *
+    ]
+_crypt32.CertGetCertificateChain.restype = _BOOL
+
+_crypt32.CertFreeCertificateContext.argtypes = [_PCCERT_CONTEXT]
+_crypt32.CertFreeCertificateContext.restype = _BOOL
+
 _kernel32.CreateFileA.argtypes = [_LPCSTR, _DWORD, _DWORD, ctypes.c_void_p,
     _DWORD, _DWORD, _HANDLE]
 _kernel32.CreateFileA.restype = _HANDLE
@@ -234,6 +302,51 @@
     finally:
         _kernel32.CloseHandle(fh)
 
+def checkcertificatechain(cert, build=True):
+    '''Tests the given certificate to see if there is a complete chain to a
+       trusted root certificate.  As a side effect, missing certificates are
+       downloaded and installed unless ``build=False``.  True is returned if a
+       chain to a trusted root exists (even if built on the fly), otherwise
+       False.  NB: A chain to a trusted root does NOT imply that the certificate
+       is valid.
+    '''
+
+    chainctxptr = ctypes.POINTER(CERT_CHAIN_CONTEXT)
+
+    pchainctx = chainctxptr()
+    chainpara = CERT_CHAIN_PARA(cbSize=ctypes.sizeof(CERT_CHAIN_PARA),
+                                RequestedUsage=CERT_USAGE_MATCH())
+
+    certctx = _crypt32.CertCreateCertificateContext(X509_ASN_ENCODING, cert,
+                                                    len(cert))
+    if certctx is None:
+        _raiseoserror('CertCreateCertificateContext')
+
+    flags = 0
+
+    if not build:
+        flags |= 0x100  # CERT_CHAIN_DISABLE_AUTH_ROOT_AUTO_UPDATE
+
+    try:
+        # Building the certificate chain will update root certs as necessary.
+        if not _crypt32.CertGetCertificateChain(None,      # hChainEngine
+                                                certctx,   # pCertContext
+                                                None,      # pTime
+                                                None,      # hAdditionalStore
+                                                ctypes.byref(chainpara),
+                                                flags,
+                                                None,      # pvReserved
+                                                ctypes.byref(pchainctx)):
+            _raiseoserror('CertGetCertificateChain')
+
+        chainctx = pchainctx.contents
+
+        return chainctx.dwErrorStatus & CERT_TRUST_IS_PARTIAL_CHAIN == 0
+    finally:
+        if pchainctx:
+            _crypt32.CertFreeCertificateChain(pchainctx)
+        _crypt32.CertFreeCertificateContext(certctx)
+
 def oslink(src, dst):
     try:
         if not _kernel32.CreateHardLinkA(dst, src, None):