view tests/get-with-headers.py @ 51925:3a90a6fd710d

dirstate: subclass the new dirstate Protocol class Behold the chaos that ensues. We'll use the generated *.pyi files to apply type annotations to the interface, and see how much agrees with the documentation. Since the CamelCase name was used to try to work around pytype issues with zope interfaces and is a new innovation this cycle (see c1d7ac70980b), drop the CamelCase name. I think the Protocol classes *should* be CamelCase, but that can be done later in one pass. For now, the CamelCase alias is extra noise in the *.pyi files.
author Matt Harbison <matt_harbison@yahoo.com>
date Thu, 26 Sep 2024 18:52:46 -0400
parents cd125eef4388
children
line wrap: on
line source

#!/usr/bin/env python

"""This does HTTP requests (GET by default) given a host:port and path and
returns a subset of the headers plus the body of the result."""


import argparse
import json
import os
import sys

from mercurial import (
    pycompat,
    util,
)

httplib = util.httplib

try:
    import msvcrt

    msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
    msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
except ImportError:
    pass

stdout = getattr(sys.stdout, 'buffer', sys.stdout)

parser = argparse.ArgumentParser()
parser.add_argument('--twice', action='store_true')
parser.add_argument('--headeronly', action='store_true')
parser.add_argument('--json', action='store_true')
parser.add_argument('--hgproto')
parser.add_argument(
    '--requestheader',
    nargs='*',
    default=[],
    help='Send an additional HTTP request header. Argument '
    'value is <header>=<value>',
)
parser.add_argument('--bodyfile', help='Write HTTP response body to a file')
parser.add_argument('--method', default='GET', help='HTTP method to use')
parser.add_argument('host')
parser.add_argument('path')
parser.add_argument('show', nargs='*')

args = parser.parse_args()

twice = args.twice
headeronly = args.headeronly
formatjson = args.json
hgproto = args.hgproto
requestheaders = args.requestheader

tag = None


def request(method, host, path, show):
    assert not path.startswith('/'), path
    global tag
    headers = {}
    if tag:
        headers['If-None-Match'] = tag
    if hgproto:
        headers['X-HgProto-1'] = hgproto

    for header in requestheaders:
        key, value = header.split('=', 1)
        headers[key] = value

    conn = httplib.HTTPConnection(host)
    conn.request(method, '/' + path, None, headers)
    response = conn.getresponse()
    stdout.write(
        b'%d %s\n' % (response.status, response.reason.encode('ascii'))
    )
    if show[:1] == ['-']:
        show = sorted(
            h for h, v in response.getheaders() if h.lower() not in show
        )
    for h in [h.lower() for h in show]:
        if response.getheader(h, None) is not None:
            stdout.write(
                b"%s: %s\n"
                % (h.encode('ascii'), response.getheader(h).encode('ascii'))
            )
    if headeronly:
        # still read the body to prevent windows to be unhappy about that
        # (this might some flakyness in test-hgweb-filelog.t on Windows)
        data = response.read()
    else:
        stdout.write(b'\n')
        data = response.read()

        if args.bodyfile:
            bodyfh = open(args.bodyfile, 'wb')
        else:
            bodyfh = stdout

        # Pretty print JSON. This also has the beneficial side-effect
        # of verifying emitted JSON is well-formed.
        if formatjson:
            # json.dumps() will print trailing newlines. Eliminate them
            # to make tests easier to write.
            data = pycompat.json_loads(data)
            lines = json.dumps(data, sort_keys=True, indent=2).splitlines()
            for line in lines:
                bodyfh.write(pycompat.sysbytes(line.rstrip()))
                bodyfh.write(b'\n')
        else:
            bodyfh.write(data)

        if args.bodyfile:
            bodyfh.close()

    if twice and response.getheader('ETag', None):
        tag = response.getheader('ETag')

    # further try to please the windows-flakyness deity
    conn.close()

    return response.status


status = request(args.method, args.host, args.path, args.show)
if twice:
    status = request(args.method, args.host, args.path, args.show)

if 200 <= status <= 305:
    sys.exit(0)
sys.exit(1)