view mercurial/hgweb/wsgicgi.py @ 49803:55d45d0de4e7

typing: add type hints to pycompat.bytestr The problem with leaving pytype to its own devices here was that for functions that returned a bytestr, pytype inferred `Union[bytes, int]`. It now accepts that it can be treated as plain bytes. I wasn't able to figure out the arg type for `__getitem__`- `SupportsIndex` (which PyCharm indicated is how the superclass function is typed) got flagged: File "/mnt/c/Users/Matt/hg/mercurial/pycompat.py", line 236, in __getitem__: unsupported operand type(s) for item retrieval: bytestr and SupportsIndex [unsupported-operands] Function __getitem__ on bytestr expects int But some caller got flagged when I marked it as `int`. There's some minor spillover problems elsewhere- pytype doesn't seem to recognize that `bytes.startswith()` can optionally take a 3rd and 4th arg, so those few places have the warning disabled. It also flags where the tar API is being abused, but that would be a tricky refactor (and would require typing extensions until py3.7 is dropped), so disable those too.
author Matt Harbison <matt_harbison@yahoo.com>
date Wed, 14 Dec 2022 01:51:33 -0500
parents f254fc73d956
children 18c8c18993f0 04bfcb416745
line wrap: on
line source

# hgweb/wsgicgi.py - CGI->WSGI translator
#
# Copyright 2006 Eric Hopper <hopper@omnifarious.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
#
# This was originally copied from the public domain code at
# http://www.python.org/dev/peps/pep-0333/#the-server-gateway-side


import os

from ..pycompat import getattr
from .. import pycompat

from ..utils import procutil

from . import common


def launch(application):
    procutil.setbinary(procutil.stdin)
    procutil.setbinary(procutil.stdout)

    environ = dict(os.environ.items())  # re-exports
    environ.setdefault('PATH_INFO', '')
    if environ.get('SERVER_SOFTWARE', '').startswith('Microsoft-IIS'):
        # IIS includes script_name in PATH_INFO
        scriptname = environ['SCRIPT_NAME']
        if environ['PATH_INFO'].startswith(scriptname):
            environ['PATH_INFO'] = environ['PATH_INFO'][len(scriptname) :]

    stdin = procutil.stdin
    if environ.get('HTTP_EXPECT', '').lower() == '100-continue':
        stdin = common.continuereader(stdin, procutil.stdout.write)

    environ['wsgi.input'] = stdin
    environ['wsgi.errors'] = procutil.stderr
    environ['wsgi.version'] = (1, 0)
    environ['wsgi.multithread'] = False
    environ['wsgi.multiprocess'] = True
    environ['wsgi.run_once'] = True

    if environ.get('HTTPS', 'off').lower() in ('on', '1', 'yes'):
        environ['wsgi.url_scheme'] = 'https'
    else:
        environ['wsgi.url_scheme'] = 'http'

    headers_set = []
    headers_sent = []
    out = procutil.stdout

    def write(data):
        if not headers_set:
            raise AssertionError(b"write() before start_response()")

        elif not headers_sent:
            # Before the first output, send the stored headers
            status, response_headers = headers_sent[:] = headers_set
            out.write(b'Status: %s\r\n' % pycompat.bytesurl(status))
            for hk, hv in response_headers:
                out.write(
                    b'%s: %s\r\n'
                    % (pycompat.bytesurl(hk), pycompat.bytesurl(hv))
                )
            out.write(b'\r\n')

        out.write(data)
        out.flush()

    def start_response(status, response_headers, exc_info=None):
        if exc_info:
            try:
                if headers_sent:
                    # Re-raise original exception if headers sent
                    raise exc_info[0](exc_info[1], exc_info[2])
            finally:
                del exc_info  # avoid dangling circular ref
        elif headers_set:
            raise AssertionError(b"Headers already set!")

        headers_set[:] = [status, response_headers]
        return write

    content = application(environ, start_response)
    try:
        for chunk in content:
            write(chunk)
        if not headers_sent:
            write(b'')  # send headers now if body was empty
    finally:
        getattr(content, 'close', lambda: None)()