view mercurial/hgweb/common.py @ 45076:23119371df5e

test-clone-uncompressed: use config to silence the lock warning The ui.timeout.warn option can solve the issue6237 in a cleaner way than what was done in 752da6863e39. That way, possible other unexpected error output will not be silenced. Differential Revision: https://phab.mercurial-scm.org/D8726
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Thu, 09 Jul 2020 02:16:26 +0200
parents 29adf0a087a1
children 9a5c4875a88c
line wrap: on
line source

# hgweb/common.py - Utility functions needed by hgweb_mod and hgwebdir_mod
#
# Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
# Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

import base64
import errno
import mimetypes
import os
import stat

from ..pycompat import (
    getattr,
    open,
)
from .. import (
    encoding,
    pycompat,
    util,
)

httpserver = util.httpserver

HTTP_OK = 200
HTTP_CREATED = 201
HTTP_NOT_MODIFIED = 304
HTTP_BAD_REQUEST = 400
HTTP_UNAUTHORIZED = 401
HTTP_FORBIDDEN = 403
HTTP_NOT_FOUND = 404
HTTP_METHOD_NOT_ALLOWED = 405
HTTP_NOT_ACCEPTABLE = 406
HTTP_UNSUPPORTED_MEDIA_TYPE = 415
HTTP_SERVER_ERROR = 500


def ismember(ui, username, userlist):
    """Check if username is a member of userlist.

    If userlist has a single '*' member, all users are considered members.
    Can be overridden by extensions to provide more complex authorization
    schemes.
    """
    return userlist == [b'*'] or username in userlist


def checkauthz(hgweb, req, op):
    '''Check permission for operation based on request data (including
    authentication info). Return if op allowed, else raise an ErrorResponse
    exception.'''

    user = req.remoteuser

    deny_read = hgweb.configlist(b'web', b'deny_read')
    if deny_read and (not user or ismember(hgweb.repo.ui, user, deny_read)):
        raise ErrorResponse(HTTP_UNAUTHORIZED, b'read not authorized')

    allow_read = hgweb.configlist(b'web', b'allow_read')
    if allow_read and (not ismember(hgweb.repo.ui, user, allow_read)):
        raise ErrorResponse(HTTP_UNAUTHORIZED, b'read not authorized')

    if op == b'pull' and not hgweb.allowpull:
        raise ErrorResponse(HTTP_UNAUTHORIZED, b'pull not authorized')
    elif op == b'pull' or op is None:  # op is None for interface requests
        return

    # Allow LFS uploading via PUT requests
    if op == b'upload':
        if req.method != b'PUT':
            msg = b'upload requires PUT request'
            raise ErrorResponse(HTTP_METHOD_NOT_ALLOWED, msg)
    # enforce that you can only push using POST requests
    elif req.method != b'POST':
        msg = b'push requires POST request'
        raise ErrorResponse(HTTP_METHOD_NOT_ALLOWED, msg)

    # require ssl by default for pushing, auth info cannot be sniffed
    # and replayed
    if hgweb.configbool(b'web', b'push_ssl') and req.urlscheme != b'https':
        raise ErrorResponse(HTTP_FORBIDDEN, b'ssl required')

    deny = hgweb.configlist(b'web', b'deny_push')
    if deny and (not user or ismember(hgweb.repo.ui, user, deny)):
        raise ErrorResponse(HTTP_UNAUTHORIZED, b'push not authorized')

    allow = hgweb.configlist(b'web', b'allow-push')
    if not (allow and ismember(hgweb.repo.ui, user, allow)):
        raise ErrorResponse(HTTP_UNAUTHORIZED, b'push not authorized')


# Hooks for hgweb permission checks; extensions can add hooks here.
# Each hook is invoked like this: hook(hgweb, request, operation),
# where operation is either read, pull, push or upload. Hooks should either
# raise an ErrorResponse exception, or just return.
#
# It is possible to do both authentication and authorization through
# this.
permhooks = [checkauthz]


class ErrorResponse(Exception):
    def __init__(self, code, message=None, headers=None):
        if message is None:
            message = _statusmessage(code)
        Exception.__init__(self, pycompat.sysstr(message))
        self.code = code
        if headers is None:
            headers = []
        self.headers = headers
        self.message = message


class continuereader(object):
    """File object wrapper to handle HTTP 100-continue.

    This is used by servers so they automatically handle Expect: 100-continue
    request headers. On first read of the request body, the 100 Continue
    response is sent. This should trigger the client into actually sending
    the request body.
    """

    def __init__(self, f, write):
        self.f = f
        self._write = write
        self.continued = False

    def read(self, amt=-1):
        if not self.continued:
            self.continued = True
            self._write(b'HTTP/1.1 100 Continue\r\n\r\n')
        return self.f.read(amt)

    def __getattr__(self, attr):
        if attr in (b'close', b'readline', b'readlines', b'__iter__'):
            return getattr(self.f, attr)
        raise AttributeError


def _statusmessage(code):
    responses = httpserver.basehttprequesthandler.responses
    return pycompat.bytesurl(responses.get(code, ('Error', 'Unknown error'))[0])


def statusmessage(code, message=None):
    return b'%d %s' % (code, message or _statusmessage(code))


def get_stat(spath, fn):
    """stat fn if it exists, spath otherwise"""
    cl_path = os.path.join(spath, fn)
    if os.path.exists(cl_path):
        return os.stat(cl_path)
    else:
        return os.stat(spath)


def get_mtime(spath):
    return get_stat(spath, b"00changelog.i")[stat.ST_MTIME]


def ispathsafe(path):
    """Determine if a path is safe to use for filesystem access."""
    parts = path.split(b'/')
    for part in parts:
        if (
            part in (b'', pycompat.oscurdir, pycompat.ospardir)
            or pycompat.ossep in part
            or pycompat.osaltsep is not None
            and pycompat.osaltsep in part
        ):
            return False

    return True


def staticfile(directory, fname, res):
    """return a file inside directory with guessed Content-Type header

    fname always uses '/' as directory separator and isn't allowed to
    contain unusual path components.
    Content-Type is guessed using the mimetypes module.
    Return an empty string if fname is illegal or file not found.

    """
    if not ispathsafe(fname):
        return

    fpath = os.path.join(*fname.split(b'/'))
    if isinstance(directory, bytes):
        directory = [directory]
    for d in directory:
        path = os.path.join(d, fpath)
        if os.path.exists(path):
            break
    try:
        os.stat(path)
        ct = pycompat.sysbytes(
            mimetypes.guess_type(pycompat.fsdecode(path))[0] or r"text/plain"
        )
        with open(path, b'rb') as fh:
            data = fh.read()

        res.headers[b'Content-Type'] = ct
        res.setbodybytes(data)
        return res
    except TypeError:
        raise ErrorResponse(HTTP_SERVER_ERROR, b'illegal filename')
    except OSError as err:
        if err.errno == errno.ENOENT:
            raise ErrorResponse(HTTP_NOT_FOUND)
        else:
            raise ErrorResponse(
                HTTP_SERVER_ERROR, encoding.strtolocal(err.strerror)
            )


def paritygen(stripecount, offset=0):
    """count parity of horizontal stripes for easier reading"""
    if stripecount and offset:
        # account for offset, e.g. due to building the list in reverse
        count = (stripecount + offset) % stripecount
        parity = (stripecount + offset) // stripecount & 1
    else:
        count = 0
        parity = 0
    while True:
        yield parity
        count += 1
        if stripecount and count >= stripecount:
            parity = 1 - parity
            count = 0


def get_contact(config):
    """Return repo contact information or empty string.

    web.contact is the primary source, but if that is not set, try
    ui.username or $EMAIL as a fallback to display something useful.
    """
    return (
        config(b"web", b"contact")
        or config(b"ui", b"username")
        or encoding.environ.get(b"EMAIL")
        or b""
    )


def cspvalues(ui):
    """Obtain the Content-Security-Policy header and nonce value.

    Returns a 2-tuple of the CSP header value and the nonce value.

    First value is ``None`` if CSP isn't enabled. Second value is ``None``
    if CSP isn't enabled or if the CSP header doesn't need a nonce.
    """
    # Without demandimport, "import uuid" could have an immediate side-effect
    # running "ldconfig" on Linux trying to find libuuid.
    # With Python <= 2.7.12, that "ldconfig" is run via a shell and the shell
    # may pollute the terminal with:
    #
    #   shell-init: error retrieving current directory: getcwd: cannot access
    #   parent directories: No such file or directory
    #
    # Python >= 2.7.13 has fixed it by running "ldconfig" directly without a
    # shell (hg changeset a09ae70f3489).
    #
    # Moved "import uuid" from here so it's executed after we know we have
    # a sane cwd (i.e. after dispatch.py cwd check).
    #
    # We can move it back once we no longer need Python <= 2.7.12 support.
    import uuid

    # Don't allow untrusted CSP setting since it be disable protections
    # from a trusted/global source.
    csp = ui.config(b'web', b'csp', untrusted=False)
    nonce = None

    if csp and b'%nonce%' in csp:
        nonce = base64.urlsafe_b64encode(uuid.uuid4().bytes).rstrip(b'=')
        csp = csp.replace(b'%nonce%', nonce)

    return csp, nonce