view contrib/automation/hgautomation/try_server.py @ 50911:1339158a8a40

cachedlocalrepo: use sysstr for attribute name of "file of interest" The first item is an attribute name, the second one is a filename. So the first one can (and should) be a `str`.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 30 Aug 2023 11:53:13 +0200
parents c5c502bd1f70
children
line wrap: on
line source

# try_server.py - Interact with Try server
#
# Copyright 2019 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

# no-check-code because Python 3 native.

import base64
import json
import os
import subprocess
import tempfile

from .aws import AWSConnection

LAMBDA_FUNCTION = "ci-try-server-upload"


def trigger_try(c: AWSConnection, rev="."):
    """Trigger a new Try run."""
    lambda_client = c.session.client("lambda")

    cset, bundle = generate_bundle(rev=rev)

    payload = {
        "bundle": base64.b64encode(bundle).decode("utf-8"),
        "node": cset["node"],
        "branch": cset["branch"],
        "user": cset["user"],
        "message": cset["desc"],
    }

    print("resolved revision:")
    print("node: %s" % cset["node"])
    print("branch: %s" % cset["branch"])
    print("user: %s" % cset["user"])
    print("desc: %s" % cset["desc"].splitlines()[0])
    print()

    print("sending to Try...")
    res = lambda_client.invoke(
        FunctionName=LAMBDA_FUNCTION,
        InvocationType="RequestResponse",
        Payload=json.dumps(payload).encode("utf-8"),
    )

    body = json.load(res["Payload"])
    for message in body:
        print("remote: %s" % message)


def generate_bundle(rev="."):
    """Generate a bundle suitable for use by the Try service.

    Returns a tuple of revision metadata and raw Mercurial bundle data.
    """
    # `hg bundle` doesn't support streaming to stdout. So we use a temporary
    # file.
    path = None
    try:
        fd, path = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
        os.close(fd)

        args = [
            "hg",
            "bundle",
            "--type",
            "gzip-v2",
            "--base",
            "public()",
            "--rev",
            rev,
            path,
        ]

        print("generating bundle...")
        subprocess.run(args, check=True)

        with open(path, "rb") as fh:
            bundle_data = fh.read()

    finally:
        if path:
            os.unlink(path)

    args = [
        "hg",
        "log",
        "-r",
        rev,
        # We have to upload as JSON, so it won't matter if we emit binary
        # since we need to normalize to UTF-8.
        "-T",
        "json",
    ]
    res = subprocess.run(args, check=True, capture_output=True)
    return json.loads(res.stdout)[0], bundle_data