Mercurial > hg
diff mercurial/util.py @ 14687:15200b46165b stable
merge default branch into stable to mark the start of the code freeze
author | Matt Mackall <mpm@selenic.com> |
---|---|
date | Sat, 18 Jun 2011 17:03:01 -0500 |
parents | 406b6d7bdcb9 |
children | 388af80c058b |
line wrap: on
line diff
--- a/mercurial/util.py Wed Jun 01 16:32:48 2011 -0500 +++ b/mercurial/util.py Sat Jun 18 17:03:01 2011 -0500 @@ -16,8 +16,8 @@ from i18n import _ import error, osutil, encoding import errno, re, shutil, sys, tempfile, traceback -import os, stat, time, calendar, textwrap, unicodedata, signal -import imp, socket +import os, time, calendar, textwrap, unicodedata, signal +import imp, socket, urllib # Python compatibility @@ -197,7 +197,7 @@ code = 0 if code: raise Abort(_("command '%s' failed: %s") % - (cmd, explain_exit(code))) + (cmd, explainexit(code))) fp = open(outname, 'rb') r = fp.read() fp.close() @@ -206,12 +206,12 @@ try: if inname: os.unlink(inname) - except: + except OSError: pass try: if outname: os.unlink(outname) - except: + except OSError: pass filtertable = { @@ -295,57 +295,9 @@ b.reverse() return os.sep.join((['..'] * len(a)) + b) or '.' -def canonpath(root, cwd, myname, auditor=None): - """return the canonical path of myname, given cwd and root""" - if endswithsep(root): - rootsep = root - else: - rootsep = root + os.sep - name = myname - if not os.path.isabs(name): - name = os.path.join(root, cwd, name) - name = os.path.normpath(name) - if auditor is None: - auditor = path_auditor(root) - if name != rootsep and name.startswith(rootsep): - name = name[len(rootsep):] - auditor(name) - return pconvert(name) - elif name == root: - return '' - else: - # Determine whether `name' is in the hierarchy at or beneath `root', - # by iterating name=dirname(name) until that causes no change (can't - # check name == '/', because that doesn't work on windows). For each - # `name', compare dev/inode numbers. If they match, the list `rel' - # holds the reversed list of components making up the relative file - # name we want. - root_st = os.stat(root) - rel = [] - while True: - try: - name_st = os.stat(name) - except OSError: - break - if samestat(name_st, root_st): - if not rel: - # name was actually the same as root (maybe a symlink) - return '' - rel.reverse() - name = os.path.join(*rel) - auditor(name) - return pconvert(name) - dirname, basename = os.path.split(name) - rel.append(basename) - if dirname == name: - break - name = dirname - - raise Abort('%s not under root' % myname) - _hgexecutable = None -def main_is_frozen(): +def mainfrozen(): """return True if we are a frozen executable. The code supports py2exe (most common, Windows only) and tools/freeze @@ -363,15 +315,15 @@ if _hgexecutable is None: hg = os.environ.get('HG') if hg: - set_hgexecutable(hg) - elif main_is_frozen(): - set_hgexecutable(sys.executable) + _sethgexecutable(hg) + elif mainfrozen(): + _sethgexecutable(sys.executable) else: - exe = find_exe('hg') or os.path.basename(sys.argv[0]) - set_hgexecutable(exe) + exe = findexe('hg') or os.path.basename(sys.argv[0]) + _sethgexecutable(exe) return _hgexecutable -def set_hgexecutable(path): +def _sethgexecutable(path): """set location of the 'hg' executable""" global _hgexecutable _hgexecutable = path @@ -402,7 +354,7 @@ env = dict(os.environ) env.update((k, py2shell(v)) for k, v in environ.iteritems()) env['HG'] = hgexecutable() - if out is None: + if out is None or out == sys.__stdout__: rc = subprocess.call(cmd, shell=True, close_fds=closefds, env=env, cwd=cwd) else: @@ -417,7 +369,7 @@ rc = 0 if rc and onerr: errmsg = '%s %s' % (os.path.basename(origcmd.split(None, 1)[0]), - explain_exit(rc)[0]) + explainexit(rc)[0]) if errprefix: errmsg = '%s: %s' % (errprefix, errmsg) try: @@ -438,6 +390,9 @@ return check +def makedir(path, notindexed): + os.mkdir(path) + def unlinkpath(f): """unlink and remove the directory if it is empty""" os.unlink(f) @@ -452,7 +407,7 @@ if os.path.islink(src): try: os.unlink(dest) - except: + except OSError: pass os.symlink(os.readlink(src), dest) else: @@ -480,7 +435,7 @@ else: if hardlink: try: - os_link(src, dst) + oslink(src, dst) except (IOError, OSError): hardlink = False shutil.copy(src, dst) @@ -490,78 +445,49 @@ return hardlink, num -class path_auditor(object): - '''ensure that a filesystem path contains no banned components. - the following properties of a path are checked: - - - ends with a directory separator - - under top-level .hg - - starts at the root of a windows drive - - contains ".." - - traverses a symlink (e.g. a/symlink_here/b) - - inside a nested repository (a callback can be used to approve - some nested repositories, e.g., subrepositories) - ''' - - def __init__(self, root, callback=None): - self.audited = set() - self.auditeddir = set() - self.root = root - self.callback = callback +_winreservednames = '''con prn aux nul + com1 com2 com3 com4 com5 com6 com7 com8 com9 + lpt1 lpt2 lpt3 lpt4 lpt5 lpt6 lpt7 lpt8 lpt9'''.split() +_winreservedchars = ':*?"<>|' +def checkwinfilename(path): + '''Check that the base-relative path is a valid filename on Windows. + Returns None if the path is ok, or a UI string describing the problem. - def __call__(self, path): - if path in self.audited: - return - # AIX ignores "/" at end of path, others raise EISDIR. - if endswithsep(path): - raise Abort(_("path ends in directory separator: %s") % path) - normpath = os.path.normcase(path) - parts = splitpath(normpath) - if (os.path.splitdrive(path)[0] - or parts[0].lower() in ('.hg', '.hg.', '') - or os.pardir in parts): - raise Abort(_("path contains illegal component: %s") % path) - if '.hg' in path.lower(): - lparts = [p.lower() for p in parts] - for p in '.hg', '.hg.': - if p in lparts[1:]: - pos = lparts.index(p) - base = os.path.join(*parts[:pos]) - raise Abort(_('path %r is inside repo %r') % (path, base)) - def check(prefix): - curpath = os.path.join(self.root, prefix) - try: - st = os.lstat(curpath) - except OSError, err: - # EINVAL can be raised as invalid path syntax under win32. - # They must be ignored for patterns can be checked too. - if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL): - raise - else: - if stat.S_ISLNK(st.st_mode): - raise Abort(_('path %r traverses symbolic link %r') % - (path, prefix)) - elif (stat.S_ISDIR(st.st_mode) and - os.path.isdir(os.path.join(curpath, '.hg'))): - if not self.callback or not self.callback(curpath): - raise Abort(_('path %r is inside repo %r') % - (path, prefix)) - parts.pop() - prefixes = [] - while parts: - prefix = os.sep.join(parts) - if prefix in self.auditeddir: - break - check(prefix) - prefixes.append(prefix) - parts.pop() + >>> checkwinfilename("just/a/normal/path") + >>> checkwinfilename("foo/bar/con.xml") + "filename contains 'con', which is reserved on Windows" + >>> checkwinfilename("foo/con.xml/bar") + "filename contains 'con', which is reserved on Windows" + >>> checkwinfilename("foo/bar/xml.con") + >>> checkwinfilename("foo/bar/AUX/bla.txt") + "filename contains 'AUX', which is reserved on Windows" + >>> checkwinfilename("foo/bar/bla:.txt") + "filename contains ':', which is reserved on Windows" + >>> checkwinfilename("foo/bar/b\07la.txt") + "filename contains '\\\\x07', which is invalid on Windows" + >>> checkwinfilename("foo/bar/bla ") + "filename ends with ' ', which is not allowed on Windows" + ''' + for n in path.replace('\\', '/').split('/'): + if not n: + continue + for c in n: + if c in _winreservedchars: + return _("filename contains '%s', which is reserved " + "on Windows") % c + if ord(c) <= 31: + return _("filename contains %r, which is invalid " + "on Windows") % c + base = n.split('.')[0] + if base and base.lower() in _winreservednames: + return _("filename contains '%s', which is reserved " + "on Windows") % base + t = n[-1] + if t in '. ': + return _("filename ends with '%s', which is not allowed " + "on Windows") % t - self.audited.add(path) - # only add prefixes to the cache after checking everything: we don't - # want to add "foo/bar/baz" before checking if there's a "foo/.hg" - self.auditeddir.update(prefixes) - -def lookup_reg(key, name=None, scope=None): +def lookupreg(key, name=None, scope=None): return None def hidewindow(): @@ -573,6 +499,7 @@ pass if os.name == 'nt': + checkosfilename = checkwinfilename from windows import * else: from posix import * @@ -629,7 +556,7 @@ if s2 == s1: return False return True - except: + except OSError: return True _fspathcache = {} @@ -680,45 +607,6 @@ return ''.join(result) -def checkexec(path): - """ - Check whether the given path is on a filesystem with UNIX-like exec flags - - Requires a directory (like /foo/.hg) - """ - - # VFAT on some Linux versions can flip mode but it doesn't persist - # a FS remount. Frequently we can detect it if files are created - # with exec bit on. - - try: - EXECFLAGS = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH - fh, fn = tempfile.mkstemp(dir=path, prefix='hg-checkexec-') - try: - os.close(fh) - m = os.stat(fn).st_mode & 0777 - new_file_has_exec = m & EXECFLAGS - os.chmod(fn, m ^ EXECFLAGS) - exec_flags_cannot_flip = ((os.stat(fn).st_mode & 0777) == m) - finally: - os.unlink(fn) - except (IOError, OSError): - # we don't care, the user probably won't be able to commit anyway - return False - return not (new_file_has_exec or exec_flags_cannot_flip) - -def checklink(path): - """check whether the given path is on a symlink-capable filesystem""" - # mktemp is not racy because symlink creation will fail if the - # file already exists - name = tempfile.mktemp(dir=path, prefix='hg-checklink-') - try: - os.symlink(".", name) - os.unlink(name) - return True - except (OSError, AttributeError): - return False - def checknlink(testfile): '''check whether hardlink count reporting works properly''' @@ -736,7 +624,7 @@ fd = None try: try: - os_link(f1, f2) + oslink(f1, f2) except OSError: return False @@ -769,7 +657,18 @@ def gui(): '''Are we running in a GUI?''' - return os.name == "nt" or os.name == "mac" or os.environ.get("DISPLAY") + if sys.platform == 'darwin': + if 'SSH_CONNECTION' in os.environ: + # handle SSH access to a box where the user is logged in + return False + elif getattr(osutil, 'isgui', None): + # check if a CoreGraphics session is available + return osutil.isgui() + else: + # pure build; use a safe default + return True + else: + return os.name == "nt" or os.environ.get("DISPLAY") def mktempcopy(name, emptyok=False, createmode=None): """Create a temporary file with the same contents from name @@ -820,38 +719,41 @@ return temp class atomictempfile(object): - """file-like object that atomically updates a file + '''writeable file object that atomically updates a file - All writes will be redirected to a temporary copy of the original - file. When rename is called, the copy is renamed to the original - name, making the changes visible. - """ + All writes will go to a temporary copy of the original file. Call + rename() when you are done writing, and atomictempfile will rename + the temporary copy to the original name, making the changes visible. + + Unlike other file-like objects, close() discards your writes by + simply deleting the temporary file. + ''' def __init__(self, name, mode='w+b', createmode=None): - self.__name = name - self._fp = None - self.temp = mktempcopy(name, emptyok=('w' in mode), - createmode=createmode) - self._fp = posixfile(self.temp, mode) + self.__name = name # permanent name + self._tempname = mktempcopy(name, emptyok=('w' in mode), + createmode=createmode) + self._fp = posixfile(self._tempname, mode) - def __getattr__(self, name): - return getattr(self._fp, name) + # delegated methods + self.write = self._fp.write + self.fileno = self._fp.fileno def rename(self): if not self._fp.closed: self._fp.close() - rename(self.temp, localpath(self.__name)) + rename(self._tempname, localpath(self.__name)) def close(self): - if not self._fp: - return if not self._fp.closed: try: - os.unlink(self.temp) - except: pass + os.unlink(self._tempname) + except OSError: + pass self._fp.close() def __del__(self): - self.close() + if hasattr(self, '_fp'): # constructor actually did something + self.close() def makedirs(name, mode=None): """recursive directory creation with parent mode inheritance""" @@ -869,97 +771,26 @@ makedirs(parent, mode) makedirs(name, mode) -class opener(object): - """Open files relative to a base directory - - This class is used to hide the details of COW semantics and - remote file access from higher level code. - """ - def __init__(self, base, audit=True): - self.base = base - if audit: - self.auditor = path_auditor(base) - else: - self.auditor = always - self.createmode = None - self._trustnlink = None - - @propertycache - def _can_symlink(self): - return checklink(self.base) - - def _fixfilemode(self, name): - if self.createmode is None: - return - os.chmod(name, self.createmode & 0666) - - def __call__(self, path, mode="r", text=False, atomictemp=False): - self.auditor(path) - f = os.path.join(self.base, path) - - if not text and "b" not in mode: - mode += "b" # for that other OS +def readfile(path): + fp = open(path, 'rb') + try: + return fp.read() + finally: + fp.close() - nlink = -1 - dirname, basename = os.path.split(f) - # If basename is empty, then the path is malformed because it points - # to a directory. Let the posixfile() call below raise IOError. - if basename and mode not in ('r', 'rb'): - if atomictemp: - if not os.path.isdir(dirname): - makedirs(dirname, self.createmode) - return atomictempfile(f, mode, self.createmode) - try: - if 'w' in mode: - unlink(f) - nlink = 0 - else: - # nlinks() may behave differently for files on Windows - # shares if the file is open. - fd = posixfile(f) - nlink = nlinks(f) - if nlink < 1: - nlink = 2 # force mktempcopy (issue1922) - fd.close() - except (OSError, IOError), e: - if e.errno != errno.ENOENT: - raise - nlink = 0 - if not os.path.isdir(dirname): - makedirs(dirname, self.createmode) - if nlink > 0: - if self._trustnlink is None: - self._trustnlink = nlink > 1 or checknlink(f) - if nlink > 1 or not self._trustnlink: - rename(mktempcopy(f), f) - fp = posixfile(f, mode) - if nlink == 0: - self._fixfilemode(f) - return fp +def writefile(path, text): + fp = open(path, 'wb') + try: + fp.write(text) + finally: + fp.close() - def symlink(self, src, dst): - self.auditor(dst) - linkname = os.path.join(self.base, dst) - try: - os.unlink(linkname) - except OSError: - pass - - dirname = os.path.dirname(linkname) - if not os.path.exists(dirname): - makedirs(dirname, self.createmode) - - if self._can_symlink: - try: - os.symlink(src, linkname) - except OSError, err: - raise OSError(err.errno, _('could not symlink to %r: %s') % - (src, err.strerror), linkname) - else: - f = self(dst, "w") - f.write(src) - f.close() - self._fixfilemode(dst) +def appendfile(path, text): + fp = open(path, 'ab') + try: + fp.write(text) + finally: + fp.close() class chunkbuffer(object): """Allow arbitrary sized chunks of data to be efficiently read from an @@ -1123,7 +954,6 @@ # fill out defaults now = makedate() defaults = {} - nowmap = {} for part in ("d", "mb", "yY", "HI", "M", "S"): # this piece is for rounding the specific end of unknowns b = bias.get(part) @@ -1204,10 +1034,17 @@ return parsedate(date, extendeddateformats, d)[0] date = date.strip() - if date[0] == "<": + + if not date: + raise Abort(_("dates cannot consist entirely of whitespace")) + elif date[0] == "<": + if not date[1:]: + raise Abort(_("invalid day spec, use '<DATE'")) when = upper(date[1:]) return lambda x: x <= when elif date[0] == ">": + if not date[1:]: + raise Abort(_("invalid day spec, use '>DATE'")) when = lower(date[1:]) return lambda x: x >= when elif date[0] == "-": @@ -1215,6 +1052,9 @@ days = int(date[1:]) except ValueError: raise Abort(_("invalid day spec: %s") % date[1:]) + if days < 0: + raise Abort(_("%s must be nonnegative (see 'hg help dates')") + % date[1:]) when = makedate()[0] - days * 3600 * 24 return lambda x: x >= when elif " to " in date: @@ -1266,86 +1106,6 @@ except (UnicodeDecodeError, UnicodeEncodeError): return _ellipsis(text, maxlength)[0] -def walkrepos(path, followsym=False, seen_dirs=None, recurse=False): - '''yield every hg repository under path, recursively.''' - def errhandler(err): - if err.filename == path: - raise err - if followsym and hasattr(os.path, 'samestat'): - def _add_dir_if_not_there(dirlst, dirname): - match = False - samestat = os.path.samestat - dirstat = os.stat(dirname) - for lstdirstat in dirlst: - if samestat(dirstat, lstdirstat): - match = True - break - if not match: - dirlst.append(dirstat) - return not match - else: - followsym = False - - if (seen_dirs is None) and followsym: - seen_dirs = [] - _add_dir_if_not_there(seen_dirs, path) - for root, dirs, files in os.walk(path, topdown=True, onerror=errhandler): - dirs.sort() - if '.hg' in dirs: - yield root # found a repository - qroot = os.path.join(root, '.hg', 'patches') - if os.path.isdir(os.path.join(qroot, '.hg')): - yield qroot # we have a patch queue repo here - if recurse: - # avoid recursing inside the .hg directory - dirs.remove('.hg') - else: - dirs[:] = [] # don't descend further - elif followsym: - newdirs = [] - for d in dirs: - fname = os.path.join(root, d) - if _add_dir_if_not_there(seen_dirs, fname): - if os.path.islink(fname): - for hgname in walkrepos(fname, True, seen_dirs): - yield hgname - else: - newdirs.append(d) - dirs[:] = newdirs - -_rcpath = None - -def os_rcpath(): - '''return default os-specific hgrc search path''' - path = system_rcpath() - path.extend(user_rcpath()) - path = [os.path.normpath(f) for f in path] - return path - -def rcpath(): - '''return hgrc search path. if env var HGRCPATH is set, use it. - for each item in path, if directory, use files ending in .rc, - else use item. - make HGRCPATH empty to only look in .hg/hgrc of current repo. - if no HGRCPATH, use default os-specific path.''' - global _rcpath - if _rcpath is None: - if 'HGRCPATH' in os.environ: - _rcpath = [] - for p in os.environ['HGRCPATH'].split(os.pathsep): - if not p: - continue - p = expandpath(p) - if os.path.isdir(p): - for f, kind in osutil.listdir(p): - if f.endswith('.rc'): - _rcpath.append(os.path.join(p, f)) - else: - _rcpath.append(p) - else: - _rcpath = os_rcpath() - return _rcpath - def bytecount(nbytes): '''return byte count formatted as readable string, with units''' @@ -1367,26 +1127,6 @@ return format % (nbytes / float(divisor)) return units[-1][2] % nbytes -def drop_scheme(scheme, path): - sc = scheme + ':' - if path.startswith(sc): - path = path[len(sc):] - if path.startswith('//'): - if scheme == 'file': - i = path.find('/', 2) - if i == -1: - return '' - # On Windows, absolute paths are rooted at the current drive - # root. On POSIX they are rooted at the file system root. - if os.name == 'nt': - droot = os.path.splitdrive(os.getcwd())[0] + '/' - path = os.path.join(droot, path[i + 1:]) - else: - path = path[i:] - else: - path = path[2:] - return path - def uirepr(s): # Avoid double backslash in Windows path repr() return repr(s).replace('\\\\', '\\') @@ -1459,7 +1199,7 @@ to avoid things opening new shell windows like batch files, so we get either the python call or current executable. """ - if main_is_frozen(): + if mainfrozen(): return [sys.executable] return gethgcmd() @@ -1564,3 +1304,296 @@ If s is not a valid boolean, returns None. """ return _booleans.get(s.lower(), None) + +_hexdig = '0123456789ABCDEFabcdef' +_hextochr = dict((a + b, chr(int(a + b, 16))) + for a in _hexdig for b in _hexdig) + +def _urlunquote(s): + """unquote('abc%20def') -> 'abc def'.""" + res = s.split('%') + # fastpath + if len(res) == 1: + return s + s = res[0] + for item in res[1:]: + try: + s += _hextochr[item[:2]] + item[2:] + except KeyError: + s += '%' + item + except UnicodeDecodeError: + s += unichr(int(item[:2], 16)) + item[2:] + return s + +class url(object): + r"""Reliable URL parser. + + This parses URLs and provides attributes for the following + components: + + <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment> + + Missing components are set to None. The only exception is + fragment, which is set to '' if present but empty. + + If parsefragment is False, fragment is included in query. If + parsequery is False, query is included in path. If both are + False, both fragment and query are included in path. + + See http://www.ietf.org/rfc/rfc2396.txt for more information. + + Note that for backward compatibility reasons, bundle URLs do not + take host names. That means 'bundle://../' has a path of '../'. + + Examples: + + >>> url('http://www.ietf.org/rfc/rfc2396.txt') + <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'> + >>> url('ssh://[::1]:2200//home/joe/repo') + <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'> + >>> url('file:///home/joe/repo') + <url scheme: 'file', path: '/home/joe/repo'> + >>> url('bundle:foo') + <url scheme: 'bundle', path: 'foo'> + >>> url('bundle://../foo') + <url scheme: 'bundle', path: '../foo'> + >>> url(r'c:\foo\bar') + <url path: 'c:\\foo\\bar'> + + Authentication credentials: + + >>> url('ssh://joe:xyz@x/repo') + <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'> + >>> url('ssh://joe@x/repo') + <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'> + + Query strings and fragments: + + >>> url('http://host/a?b#c') + <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'> + >>> url('http://host/a?b#c', parsequery=False, parsefragment=False) + <url scheme: 'http', host: 'host', path: 'a?b#c'> + """ + + _safechars = "!~*'()+" + _safepchars = "/!~*'()+" + _matchscheme = re.compile(r'^[a-zA-Z0-9+.\-]+:').match + + def __init__(self, path, parsequery=True, parsefragment=True): + # We slowly chomp away at path until we have only the path left + self.scheme = self.user = self.passwd = self.host = None + self.port = self.path = self.query = self.fragment = None + self._localpath = True + self._hostport = '' + self._origpath = path + + # special case for Windows drive letters + if hasdriveletter(path): + self.path = path + return + + # For compatibility reasons, we can't handle bundle paths as + # normal URLS + if path.startswith('bundle:'): + self.scheme = 'bundle' + path = path[7:] + if path.startswith('//'): + path = path[2:] + self.path = path + return + + if self._matchscheme(path): + parts = path.split(':', 1) + if parts[0]: + self.scheme, path = parts + self._localpath = False + + if not path: + path = None + if self._localpath: + self.path = '' + return + else: + if parsefragment and '#' in path: + path, self.fragment = path.split('#', 1) + if not path: + path = None + if self._localpath: + self.path = path + return + + if parsequery and '?' in path: + path, self.query = path.split('?', 1) + if not path: + path = None + if not self.query: + self.query = None + + # // is required to specify a host/authority + if path and path.startswith('//'): + parts = path[2:].split('/', 1) + if len(parts) > 1: + self.host, path = parts + path = path + else: + self.host = parts[0] + path = None + if not self.host: + self.host = None + if path: + path = '/' + path + + if self.host and '@' in self.host: + self.user, self.host = self.host.rsplit('@', 1) + if ':' in self.user: + self.user, self.passwd = self.user.split(':', 1) + if not self.host: + self.host = None + + # Don't split on colons in IPv6 addresses without ports + if (self.host and ':' in self.host and + not (self.host.startswith('[') and self.host.endswith(']'))): + self._hostport = self.host + self.host, self.port = self.host.rsplit(':', 1) + if not self.host: + self.host = None + + if (self.host and self.scheme == 'file' and + self.host not in ('localhost', '127.0.0.1', '[::1]')): + raise Abort(_('file:// URLs can only refer to localhost')) + + self.path = path + + for a in ('user', 'passwd', 'host', 'port', + 'path', 'query', 'fragment'): + v = getattr(self, a) + if v is not None: + setattr(self, a, _urlunquote(v)) + + def __repr__(self): + attrs = [] + for a in ('scheme', 'user', 'passwd', 'host', 'port', 'path', + 'query', 'fragment'): + v = getattr(self, a) + if v is not None: + attrs.append('%s: %r' % (a, v)) + return '<url %s>' % ', '.join(attrs) + + def __str__(self): + r"""Join the URL's components back into a URL string. + + Examples: + + >>> str(url('http://user:pw@host:80/?foo#bar')) + 'http://user:pw@host:80/?foo#bar' + >>> str(url('ssh://user:pw@[::1]:2200//home/joe#')) + 'ssh://user:pw@[::1]:2200//home/joe#' + >>> str(url('http://localhost:80//')) + 'http://localhost:80//' + >>> str(url('http://localhost:80/')) + 'http://localhost:80/' + >>> str(url('http://localhost:80')) + 'http://localhost:80/' + >>> str(url('bundle:foo')) + 'bundle:foo' + >>> str(url('bundle://../foo')) + 'bundle:../foo' + >>> str(url('path')) + 'path' + >>> str(url('file:///tmp/foo/bar')) + 'file:///tmp/foo/bar' + >>> print url(r'bundle:foo\bar') + bundle:foo\bar + """ + if self._localpath: + s = self.path + if self.scheme == 'bundle': + s = 'bundle:' + s + if self.fragment: + s += '#' + self.fragment + return s + + s = self.scheme + ':' + if self.user or self.passwd or self.host: + s += '//' + elif self.scheme and (not self.path or self.path.startswith('/')): + s += '//' + if self.user: + s += urllib.quote(self.user, safe=self._safechars) + if self.passwd: + s += ':' + urllib.quote(self.passwd, safe=self._safechars) + if self.user or self.passwd: + s += '@' + if self.host: + if not (self.host.startswith('[') and self.host.endswith(']')): + s += urllib.quote(self.host) + else: + s += self.host + if self.port: + s += ':' + urllib.quote(self.port) + if self.host: + s += '/' + if self.path: + s += urllib.quote(self.path, safe=self._safepchars) + if self.query: + s += '?' + urllib.quote(self.query, safe=self._safepchars) + if self.fragment is not None: + s += '#' + urllib.quote(self.fragment, safe=self._safepchars) + return s + + def authinfo(self): + user, passwd = self.user, self.passwd + try: + self.user, self.passwd = None, None + s = str(self) + finally: + self.user, self.passwd = user, passwd + if not self.user: + return (s, None) + return (s, (None, (str(self), self.host), + self.user, self.passwd or '')) + + def localpath(self): + if self.scheme == 'file' or self.scheme == 'bundle': + path = self.path or '/' + # For Windows, we need to promote hosts containing drive + # letters to paths with drive letters. + if hasdriveletter(self._hostport): + path = self._hostport + '/' + self.path + elif self.host is not None and self.path: + path = '/' + path + # We also need to handle the case of file:///C:/, which + # should return C:/, not /C:/. + elif hasdriveletter(path): + # Strip leading slash from paths with drive names + return path[1:] + return path + return self._origpath + +def hasscheme(path): + return bool(url(path).scheme) + +def hasdriveletter(path): + return path[1:2] == ':' and path[0:1].isalpha() + +def localpath(path): + return url(path, parsequery=False, parsefragment=False).localpath() + +def hidepassword(u): + '''hide user credential in a url string''' + u = url(u) + if u.passwd: + u.passwd = '***' + return str(u) + +def removeauth(u): + '''remove all authentication information from a url string''' + u = url(u) + u.user = u.passwd = None + return str(u) + +def isatty(fd): + try: + return fd.isatty() + except AttributeError: + return False