Mercurial > hg
view mercurial/pycompat.py @ 32687:12941a782928
dispatch: do not close stdout and stderr, just flush() instead
Since 3a4c0905f357 "util: always force line buffered stdout when stdout is
a tty", we have two file objects attached to the same STDOUT_FILENO. If one
is closed, the underlying file descriptor is also closed, and writing to
the other file object would crash the Python interpreter in a hard way, at
least on Windows.
So, it seems safer to not close the standard streams. This also matches
the behavior of the default sys.stdout/stderr.close(), which never close
the FILE* streams in C layer.
https://hg.python.org/cpython/file/v2.7.13/Python/sysmodule.c#l1401
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Sun, 23 Apr 2017 00:31:29 +0900 |
parents | c9318beb7c1a |
children | a05f3675c46a |
line wrap: on
line source
# pycompat.py - portability shim for python 3 # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. """Mercurial portability shim for python 3. This contains aliases to hide python version-specific details from the core. """ from __future__ import absolute_import import getopt import os import shlex import sys ispy3 = (sys.version_info[0] >= 3) if not ispy3: import cookielib import cPickle as pickle import httplib import Queue as _queue import SocketServer as socketserver import xmlrpclib else: import http.cookiejar as cookielib import http.client as httplib import pickle import queue as _queue import socketserver import xmlrpc.client as xmlrpclib def identity(a): return a if ispy3: import builtins import functools import io import struct fsencode = os.fsencode fsdecode = os.fsdecode # A bytes version of os.name. oslinesep = os.linesep.encode('ascii') osname = os.name.encode('ascii') ospathsep = os.pathsep.encode('ascii') ossep = os.sep.encode('ascii') osaltsep = os.altsep if osaltsep: osaltsep = osaltsep.encode('ascii') # os.getcwd() on Python 3 returns string, but it has os.getcwdb() which # returns bytes. getcwd = os.getcwdb sysplatform = sys.platform.encode('ascii') sysexecutable = sys.executable if sysexecutable: sysexecutable = os.fsencode(sysexecutable) stringio = io.BytesIO maplist = lambda *args: list(map(*args)) # TODO: .buffer might not exist if std streams were replaced; we'll need # a silly wrapper to make a bytes stream backed by a unicode one. stdin = sys.stdin.buffer stdout = sys.stdout.buffer stderr = sys.stderr.buffer # Since Python 3 converts argv to wchar_t type by Py_DecodeLocale() on Unix, # we can use os.fsencode() to get back bytes argv. # # https://hg.python.org/cpython/file/v3.5.1/Programs/python.c#l55 # # TODO: On Windows, the native argv is wchar_t, so we'll need a different # workaround to simulate the Python 2 (i.e. ANSI Win32 API) behavior. if getattr(sys, 'argv', None) is not None: sysargv = list(map(os.fsencode, sys.argv)) bytechr = struct.Struct('>B').pack class bytestr(bytes): """A bytes which mostly acts as a Python 2 str >>> bytestr(), bytestr(bytearray(b'foo')), bytestr(u'ascii'), bytestr(1) (b'', b'foo', b'ascii', b'1') >>> s = bytestr(b'foo') >>> assert s is bytestr(s) __bytes__() should be called if provided: >>> class bytesable(object): ... def __bytes__(self): ... return b'bytes' >>> bytestr(bytesable()) b'bytes' There's no implicit conversion from non-ascii str as its encoding is unknown: >>> bytestr(chr(0x80)) # doctest: +ELLIPSIS Traceback (most recent call last): ... UnicodeEncodeError: ... Comparison between bytestr and bytes should work: >>> assert bytestr(b'foo') == b'foo' >>> assert b'foo' == bytestr(b'foo') >>> assert b'f' in bytestr(b'foo') >>> assert bytestr(b'f') in b'foo' Sliced elements should be bytes, not integer: >>> s[1], s[:2] (b'o', b'fo') >>> list(s), list(reversed(s)) ([b'f', b'o', b'o'], [b'o', b'o', b'f']) As bytestr type isn't propagated across operations, you need to cast bytes to bytestr explicitly: >>> s = bytestr(b'foo').upper() >>> t = bytestr(s) >>> s[0], t[0] (70, b'F') Be careful to not pass a bytestr object to a function which expects bytearray-like behavior. >>> t = bytes(t) # cast to bytes >>> assert type(t) is bytes """ def __new__(cls, s=b''): if isinstance(s, bytestr): return s if (not isinstance(s, (bytes, bytearray)) and not hasattr(s, u'__bytes__')): # hasattr-py3-only s = str(s).encode(u'ascii') return bytes.__new__(cls, s) def __getitem__(self, key): s = bytes.__getitem__(self, key) if not isinstance(s, bytes): s = bytechr(s) return s def __iter__(self): return iterbytestr(bytes.__iter__(self)) def iterbytestr(s): """Iterate bytes as if it were a str object of Python 2""" return map(bytechr, s) def sysbytes(s): """Convert an internal str (e.g. keyword, __doc__) back to bytes This never raises UnicodeEncodeError, but only ASCII characters can be round-trip by sysstr(sysbytes(s)). """ return s.encode(u'utf-8') def sysstr(s): """Return a keyword str to be passed to Python functions such as getattr() and str.encode() This never raises UnicodeDecodeError. Non-ascii characters are considered invalid and mapped to arbitrary but unique code points such that 'sysstr(a) != sysstr(b)' for all 'a != b'. """ if isinstance(s, builtins.str): return s return s.decode(u'latin-1') def raisewithtb(exc, tb): """Raise exception with the given traceback""" raise exc.with_traceback(tb) def getdoc(obj): """Get docstring as bytes; may be None so gettext() won't confuse it with _('')""" doc = getattr(obj, u'__doc__', None) if doc is None: return doc return sysbytes(doc) def _wrapattrfunc(f): @functools.wraps(f) def w(object, name, *args): return f(object, sysstr(name), *args) return w # these wrappers are automagically imported by hgloader delattr = _wrapattrfunc(builtins.delattr) getattr = _wrapattrfunc(builtins.getattr) hasattr = _wrapattrfunc(builtins.hasattr) setattr = _wrapattrfunc(builtins.setattr) xrange = builtins.range unicode = str def open(name, mode='r', buffering=-1): return builtins.open(name, sysstr(mode), buffering) # getopt.getopt() on Python 3 deals with unicodes internally so we cannot # pass bytes there. Passing unicodes will result in unicodes as return # values which we need to convert again to bytes. def getoptb(args, shortlist, namelist): args = [a.decode('latin-1') for a in args] shortlist = shortlist.decode('latin-1') namelist = [a.decode('latin-1') for a in namelist] opts, args = getopt.getopt(args, shortlist, namelist) opts = [(a[0].encode('latin-1'), a[1].encode('latin-1')) for a in opts] args = [a.encode('latin-1') for a in args] return opts, args # keys of keyword arguments in Python need to be strings which are unicodes # Python 3. This function takes keyword arguments, convert the keys to str. def strkwargs(dic): dic = dict((k.decode('latin-1'), v) for k, v in dic.iteritems()) return dic # keys of keyword arguments need to be unicode while passing into # a function. This function helps us to convert those keys back to bytes # again as we need to deal with bytes. def byteskwargs(dic): dic = dict((k.encode('latin-1'), v) for k, v in dic.iteritems()) return dic # shlex.split() accepts unicodes on Python 3. This function takes bytes # argument, convert it into unicodes, pass into shlex.split(), convert the # returned value to bytes and return that. # TODO: handle shlex.shlex(). def shlexsplit(s): ret = shlex.split(s.decode('latin-1')) return [a.encode('latin-1') for a in ret] else: import cStringIO bytechr = chr bytestr = str iterbytestr = iter sysbytes = identity sysstr = identity # this can't be parsed on Python 3 exec('def raisewithtb(exc, tb):\n' ' raise exc, None, tb\n') # Partial backport from os.py in Python 3, which only accepts bytes. # In Python 2, our paths should only ever be bytes, a unicode path # indicates a bug. def fsencode(filename): if isinstance(filename, str): return filename else: raise TypeError( "expect str, not %s" % type(filename).__name__) # In Python 2, fsdecode() has a very chance to receive bytes. So it's # better not to touch Python 2 part as it's already working fine. fsdecode = identity def getdoc(obj): return getattr(obj, '__doc__', None) def getoptb(args, shortlist, namelist): return getopt.getopt(args, shortlist, namelist) strkwargs = identity byteskwargs = identity oslinesep = os.linesep osname = os.name ospathsep = os.pathsep ossep = os.sep osaltsep = os.altsep stdin = sys.stdin stdout = sys.stdout stderr = sys.stderr if getattr(sys, 'argv', None) is not None: sysargv = sys.argv sysplatform = sys.platform getcwd = os.getcwd sysexecutable = sys.executable shlexsplit = shlex.split stringio = cStringIO.StringIO maplist = map empty = _queue.Empty queue = _queue.Queue class _pycompatstub(object): def __init__(self): self._aliases = {} def _registeraliases(self, origin, items): """Add items that will be populated at the first access""" items = map(sysstr, items) self._aliases.update( (item.replace(sysstr('_'), sysstr('')).lower(), (origin, item)) for item in items) def _registeralias(self, origin, attr, name): """Alias ``origin``.``attr`` as ``name``""" self._aliases[sysstr(name)] = (origin, sysstr(attr)) def __getattr__(self, name): try: origin, item = self._aliases[name] except KeyError: raise AttributeError(name) self.__dict__[name] = obj = getattr(origin, item) return obj httpserver = _pycompatstub() urlreq = _pycompatstub() urlerr = _pycompatstub() if not ispy3: import BaseHTTPServer import CGIHTTPServer import SimpleHTTPServer import urllib2 import urllib import urlparse urlreq._registeraliases(urllib, ( "addclosehook", "addinfourl", "ftpwrapper", "pathname2url", "quote", "splitattr", "splitpasswd", "splitport", "splituser", "unquote", "url2pathname", "urlencode", )) urlreq._registeraliases(urllib2, ( "AbstractHTTPHandler", "BaseHandler", "build_opener", "FileHandler", "FTPHandler", "HTTPBasicAuthHandler", "HTTPDigestAuthHandler", "HTTPHandler", "HTTPPasswordMgrWithDefaultRealm", "HTTPSHandler", "install_opener", "ProxyHandler", "Request", "urlopen", )) urlreq._registeraliases(urlparse, ( "urlparse", "urlunparse", )) urlerr._registeraliases(urllib2, ( "HTTPError", "URLError", )) httpserver._registeraliases(BaseHTTPServer, ( "HTTPServer", "BaseHTTPRequestHandler", )) httpserver._registeraliases(SimpleHTTPServer, ( "SimpleHTTPRequestHandler", )) httpserver._registeraliases(CGIHTTPServer, ( "CGIHTTPRequestHandler", )) else: import urllib.parse urlreq._registeraliases(urllib.parse, ( "splitattr", "splitpasswd", "splitport", "splituser", "urlparse", "urlunparse", )) urlreq._registeralias(urllib.parse, "unquote_to_bytes", "unquote") import urllib.request urlreq._registeraliases(urllib.request, ( "AbstractHTTPHandler", "BaseHandler", "build_opener", "FileHandler", "FTPHandler", "ftpwrapper", "HTTPHandler", "HTTPSHandler", "install_opener", "pathname2url", "HTTPBasicAuthHandler", "HTTPDigestAuthHandler", "HTTPPasswordMgrWithDefaultRealm", "ProxyHandler", "Request", "url2pathname", "urlopen", )) import urllib.response urlreq._registeraliases(urllib.response, ( "addclosehook", "addinfourl", )) import urllib.error urlerr._registeraliases(urllib.error, ( "HTTPError", "URLError", )) import http.server httpserver._registeraliases(http.server, ( "HTTPServer", "BaseHTTPRequestHandler", "SimpleHTTPRequestHandler", "CGIHTTPRequestHandler", )) # urllib.parse.quote() accepts both str and bytes, decodes bytes # (if necessary), and returns str. This is wonky. We provide a custom # implementation that only accepts bytes and emits bytes. def quote(s, safe=r'/'): s = urllib.parse.quote_from_bytes(s, safe=safe) return s.encode('ascii', 'strict') # urllib.parse.urlencode() returns str. We use this function to make # sure we return bytes. def urlencode(query, doseq=False): s = urllib.parse.urlencode(query, doseq=doseq) return s.encode('ascii') urlreq.quote = quote urlreq.urlencode = urlencode