Mercurial > hg
view contrib/automation/hgautomation/winrm.py @ 49930:e98fd81bb151
rust-clippy: fix most warnings in `hg-core`
All of these are simple changes that for the most part are clear improvements
and the rest are at most equivalent.
The remaining warnings have to be fixed either with a bigger refactor like for
the nested "revlog" module, or in the dependency `bytes-cast`, which we own.
This will be done sometime in the future.
author | Raphaël Gomès <rgomes@octobus.net> |
---|---|
date | Mon, 09 Jan 2023 19:18:43 +0100 |
parents | 6a350194de7f |
children |
line wrap: on
line source
# winrm.py - Interact with Windows Remote Management (WinRM) # # Copyright 2019 Gregory Szorc <gregory.szorc@gmail.com> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. # no-check-code because Python 3 native. import logging import pprint import time from pypsrp.client import Client from pypsrp.powershell import ( PowerShell, PSInvocationState, RunspacePool, ) import requests.exceptions logger = logging.getLogger(__name__) def wait_for_winrm(host, username, password, timeout=180, ssl=False): """Wait for the Windows Remoting (WinRM) service to become available. Returns a ``psrpclient.Client`` instance. """ end_time = time.time() + timeout while True: try: client = Client( host, username=username, password=password, ssl=ssl, connection_timeout=5, ) client.execute_ps("Write-Host 'Hello, World!'") return client except requests.exceptions.ConnectionError: if time.time() >= end_time: raise time.sleep(1) def format_object(o): if isinstance(o, str): return o try: o = str(o) except (AttributeError, TypeError): o = pprint.pformat(o.extended_properties) return o def run_powershell(client, script): with RunspacePool(client.wsman) as pool: ps = PowerShell(pool) ps.add_script(script) ps.begin_invoke() while ps.state == PSInvocationState.RUNNING: ps.poll_invoke() for o in ps.output: print(format_object(o)) ps.output[:] = [] ps.end_invoke() for o in ps.output: print(format_object(o)) if ps.state == PSInvocationState.FAILED: raise Exception( 'PowerShell execution failed: %s' % ' '.join(map(format_object, ps.streams.error)) )