| # util.py - Mercurial utility functions and platform specific implementations |
| # |
| # Copyright 2005 K. Thananchayan <thananck@yahoo.com> |
| # Copyright 2005-2007 Matt Mackall <mpm@selenic.com> |
| # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com> |
| # |
| # This software may be used and distributed according to the terms of the |
| # GNU General Public License version 2 or any later version. |
| |
| """Mercurial utility functions and platform specific implementations. |
| |
| This contains helper routines that are independent of the SCM core and |
| hide platform-specific details from the core. |
| """ |
| |
| from i18n import _ |
| import error, osutil, encoding, collections |
| import errno, re, shutil, sys, tempfile, traceback |
| import os, time, datetime, calendar, textwrap, signal |
| import imp, socket, urllib |
| |
| if os.name == 'nt': |
| import windows as platform |
| else: |
| import posix as platform |
| |
| cachestat = platform.cachestat |
| checkexec = platform.checkexec |
| checklink = platform.checklink |
| copymode = platform.copymode |
| executablepath = platform.executablepath |
| expandglobs = platform.expandglobs |
| explainexit = platform.explainexit |
| findexe = platform.findexe |
| gethgcmd = platform.gethgcmd |
| getuser = platform.getuser |
| groupmembers = platform.groupmembers |
| groupname = platform.groupname |
| hidewindow = platform.hidewindow |
| isexec = platform.isexec |
| isowner = platform.isowner |
| localpath = platform.localpath |
| lookupreg = platform.lookupreg |
| makedir = platform.makedir |
| nlinks = platform.nlinks |
| normpath = platform.normpath |
| normcase = platform.normcase |
| openhardlinks = platform.openhardlinks |
| oslink = platform.oslink |
| parsepatchoutput = platform.parsepatchoutput |
| pconvert = platform.pconvert |
| popen = platform.popen |
| posixfile = platform.posixfile |
| quotecommand = platform.quotecommand |
| realpath = platform.realpath |
| rename = platform.rename |
| samedevice = platform.samedevice |
| samefile = platform.samefile |
| samestat = platform.samestat |
| setbinary = platform.setbinary |
| setflags = platform.setflags |
| setsignalhandler = platform.setsignalhandler |
| shellquote = platform.shellquote |
| spawndetached = platform.spawndetached |
| split = platform.split |
| sshargs = platform.sshargs |
| statfiles = getattr(osutil, 'statfiles', platform.statfiles) |
| statisexec = platform.statisexec |
| statislink = platform.statislink |
| termwidth = platform.termwidth |
| testpid = platform.testpid |
| umask = platform.umask |
| unlink = platform.unlink |
| unlinkpath = platform.unlinkpath |
| username = platform.username |
| |
| # Python compatibility |
| |
| _notset = object() |
| |
| def safehasattr(thing, attr): |
| return getattr(thing, attr, _notset) is not _notset |
| |
| def sha1(s=''): |
| ''' |
| Low-overhead wrapper around Python's SHA support |
| |
| >>> f = _fastsha1 |
| >>> a = sha1() |
| >>> a = f() |
| >>> a.hexdigest() |
| 'da39a3ee5e6b4b0d3255bfef95601890afd80709' |
| ''' |
| |
| return _fastsha1(s) |
| |
| def _fastsha1(s=''): |
| # This function will import sha1 from hashlib or sha (whichever is |
| # available) and overwrite itself with it on the first call. |
| # Subsequent calls will go directly to the imported function. |
| if sys.version_info >= (2, 5): |
| from hashlib import sha1 as _sha1 |
| else: |
| from sha import sha as _sha1 |
| global _fastsha1, sha1 |
| _fastsha1 = sha1 = _sha1 |
| return _sha1(s) |
| |
| try: |
| buffer = buffer |
| except NameError: |
| if sys.version_info[0] < 3: |
| def buffer(sliceable, offset=0): |
| return sliceable[offset:] |
| else: |
| def buffer(sliceable, offset=0): |
| return memoryview(sliceable)[offset:] |
| |
| import subprocess |
| closefds = os.name == 'posix' |
| |
| def popen2(cmd, env=None, newlines=False): |
| # Setting bufsize to -1 lets the system decide the buffer size. |
| # The default for bufsize is 0, meaning unbuffered. This leads to |
| # poor performance on Mac OS X: http://bugs.python.org/issue4194 |
| p = subprocess.Popen(cmd, shell=True, bufsize=-1, |
| close_fds=closefds, |
| stdin=subprocess.PIPE, stdout=subprocess.PIPE, |
| universal_newlines=newlines, |
| env=env) |
| return p.stdin, p.stdout |
| |
| def popen3(cmd, env=None, newlines=False): |
| stdin, stdout, stderr, p = popen4(cmd, env, newlines) |
| return stdin, stdout, stderr |
| |
| def popen4(cmd, env=None, newlines=False): |
| p = subprocess.Popen(cmd, shell=True, bufsize=-1, |
| close_fds=closefds, |
| stdin=subprocess.PIPE, stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| universal_newlines=newlines, |
| env=env) |
| return p.stdin, p.stdout, p.stderr, p |
| |
| def version(): |
| """Return version information if available.""" |
| try: |
| import __version__ |
| return __version__.version |
| except ImportError: |
| return 'unknown' |
| |
| # used by parsedate |
| defaultdateformats = ( |
| '%Y-%m-%d %H:%M:%S', |
| '%Y-%m-%d %I:%M:%S%p', |
| '%Y-%m-%d %H:%M', |
| '%Y-%m-%d %I:%M%p', |
| '%Y-%m-%d', |
| '%m-%d', |
| '%m/%d', |
| '%m/%d/%y', |
| '%m/%d/%Y', |
| '%a %b %d %H:%M:%S %Y', |
| '%a %b %d %I:%M:%S%p %Y', |
| '%a, %d %b %Y %H:%M:%S', # GNU coreutils "/bin/date --rfc-2822" |
| '%b %d %H:%M:%S %Y', |
| '%b %d %I:%M:%S%p %Y', |
| '%b %d %H:%M:%S', |
| '%b %d %I:%M:%S%p', |
| '%b %d %H:%M', |
| '%b %d %I:%M%p', |
| '%b %d %Y', |
| '%b %d', |
| '%H:%M:%S', |
| '%I:%M:%S%p', |
| '%H:%M', |
| '%I:%M%p', |
| ) |
| |
| extendeddateformats = defaultdateformats + ( |
| "%Y", |
| "%Y-%m", |
| "%b", |
| "%b %Y", |
| ) |
| |
| def cachefunc(func): |
| '''cache the result of function calls''' |
| # XXX doesn't handle keywords args |
| cache = {} |
| if func.func_code.co_argcount == 1: |
| # we gain a small amount of time because |
| # we don't need to pack/unpack the list |
| def f(arg): |
| if arg not in cache: |
| cache[arg] = func(arg) |
| return cache[arg] |
| else: |
| def f(*args): |
| if args not in cache: |
| cache[args] = func(*args) |
| return cache[args] |
| |
| return f |
| |
| try: |
| collections.deque.remove |
| deque = collections.deque |
| except AttributeError: |
| # python 2.4 lacks deque.remove |
| class deque(collections.deque): |
| def remove(self, val): |
| for i, v in enumerate(self): |
| if v == val: |
| del self[i] |
| break |
| |
| class lrucachedict(object): |
| '''cache most recent gets from or sets to this dictionary''' |
| def __init__(self, maxsize): |
| self._cache = {} |
| self._maxsize = maxsize |
| self._order = deque() |
| |
| def __getitem__(self, key): |
| value = self._cache[key] |
| self._order.remove(key) |
| self._order.append(key) |
| return value |
| |
| def __setitem__(self, key, value): |
| if key not in self._cache: |
| if len(self._cache) >= self._maxsize: |
| del self._cache[self._order.popleft()] |
| else: |
| self._order.remove(key) |
| self._cache[key] = value |
| self._order.append(key) |
| |
| def __contains__(self, key): |
| return key in self._cache |
| |
| def lrucachefunc(func): |
| '''cache most recent results of function calls''' |
| cache = {} |
| order = deque() |
| if func.func_code.co_argcount == 1: |
| def f(arg): |
| if arg not in cache: |
| if len(cache) > 20: |
| del cache[order.popleft()] |
| cache[arg] = func(arg) |
| else: |
| order.remove(arg) |
| order.append(arg) |
| return cache[arg] |
| else: |
| def f(*args): |
| if args not in cache: |
| if len(cache) > 20: |
| del cache[order.popleft()] |
| cache[args] = func(*args) |
| else: |
| order.remove(args) |
| order.append(args) |
| return cache[args] |
| |
| return f |
| |
| class propertycache(object): |
| def __init__(self, func): |
| self.func = func |
| self.name = func.__name__ |
| def __get__(self, obj, type=None): |
| result = self.func(obj) |
| self.cachevalue(obj, result) |
| return result |
| |
| def cachevalue(self, obj, value): |
| setattr(obj, self.name, value) |
| |
| def pipefilter(s, cmd): |
| '''filter string S through command CMD, returning its output''' |
| p = subprocess.Popen(cmd, shell=True, close_fds=closefds, |
| stdin=subprocess.PIPE, stdout=subprocess.PIPE) |
| pout, perr = p.communicate(s) |
| return pout |
| |
| def tempfilter(s, cmd): |
| '''filter string S through a pair of temporary files with CMD. |
| CMD is used as a template to create the real command to be run, |
| with the strings INFILE and OUTFILE replaced by the real names of |
| the temporary files generated.''' |
| inname, outname = None, None |
| try: |
| infd, inname = tempfile.mkstemp(prefix='hg-filter-in-') |
| fp = os.fdopen(infd, 'wb') |
| fp.write(s) |
| fp.close() |
| outfd, outname = tempfile.mkstemp(prefix='hg-filter-out-') |
| os.close(outfd) |
| cmd = cmd.replace('INFILE', inname) |
| cmd = cmd.replace('OUTFILE', outname) |
| code = os.system(cmd) |
| if sys.platform == 'OpenVMS' and code & 1: |
| code = 0 |
| if code: |
| raise Abort(_("command '%s' failed: %s") % |
| (cmd, explainexit(code))) |
| fp = open(outname, 'rb') |
| r = fp.read() |
| fp.close() |
| return r |
| finally: |
| try: |
| if inname: |
| os.unlink(inname) |
| except OSError: |
| pass |
| try: |
| if outname: |
| os.unlink(outname) |
| except OSError: |
| pass |
| |
| filtertable = { |
| 'tempfile:': tempfilter, |
| 'pipe:': pipefilter, |
| } |
| |
| def filter(s, cmd): |
| "filter a string through a command that transforms its input to its output" |
| for name, fn in filtertable.iteritems(): |
| if cmd.startswith(name): |
| return fn(s, cmd[len(name):].lstrip()) |
| return pipefilter(s, cmd) |
| |
| def binary(s): |
| """return true if a string is binary data""" |
| return bool(s and '\0' in s) |
| |
| def increasingchunks(source, min=1024, max=65536): |
| '''return no less than min bytes per chunk while data remains, |
| doubling min after each chunk until it reaches max''' |
| def log2(x): |
| if not x: |
| return 0 |
| i = 0 |
| while x: |
| x >>= 1 |
| i += 1 |
| return i - 1 |
| |
| buf = [] |
| blen = 0 |
| for chunk in source: |
| buf.append(chunk) |
| blen += len(chunk) |
| if blen >= min: |
| if min < max: |
| min = min << 1 |
| nmin = 1 << log2(blen) |
| if nmin > min: |
| min = nmin |
| if min > max: |
| min = max |
| yield ''.join(buf) |
| blen = 0 |
| buf = [] |
| if buf: |
| yield ''.join(buf) |
| |
| Abort = error.Abort |
| |
| def always(fn): |
| return True |
| |
| def never(fn): |
| return False |
| |
| def pathto(root, n1, n2): |
| '''return the relative path from one place to another. |
| root should use os.sep to separate directories |
| n1 should use os.sep to separate directories |
| n2 should use "/" to separate directories |
| returns an os.sep-separated path. |
| |
| If n1 is a relative path, it's assumed it's |
| relative to root. |
| n2 should always be relative to root. |
| ''' |
| if not n1: |
| return localpath(n2) |
| if os.path.isabs(n1): |
| if os.path.splitdrive(root)[0] != os.path.splitdrive(n1)[0]: |
| return os.path.join(root, localpath(n2)) |
| n2 = '/'.join((pconvert(root), n2)) |
| a, b = splitpath(n1), n2.split('/') |
| a.reverse() |
| b.reverse() |
| while a and b and a[-1] == b[-1]: |
| a.pop() |
| b.pop() |
| b.reverse() |
| return os.sep.join((['..'] * len(a)) + b) or '.' |
| |
| _hgexecutable = None |
| |
| def mainfrozen(): |
| """return True if we are a frozen executable. |
| |
| The code supports py2exe (most common, Windows only) and tools/freeze |
| (portable, not much used). |
| """ |
| return (safehasattr(sys, "frozen") or # new py2exe |
| safehasattr(sys, "importers") or # old py2exe |
| imp.is_frozen("__main__")) # tools/freeze |
| |
| def hgexecutable(): |
| """return location of the 'hg' executable. |
| |
| Defaults to $HG or 'hg' in the search path. |
| """ |
| if _hgexecutable is None: |
| hg = os.environ.get('HG') |
| mainmod = sys.modules['__main__'] |
| if hg: |
| _sethgexecutable(hg) |
| elif mainfrozen(): |
| _sethgexecutable(sys.executable) |
| elif os.path.basename(getattr(mainmod, '__file__', '')) == 'hg': |
| _sethgexecutable(mainmod.__file__) |
| else: |
| exe = findexe('hg') or os.path.basename(sys.argv[0]) |
| _sethgexecutable(exe) |
| return _hgexecutable |
| |
| def _sethgexecutable(path): |
| """set location of the 'hg' executable""" |
| global _hgexecutable |
| _hgexecutable = path |
| |
| def system(cmd, environ={}, cwd=None, onerr=None, errprefix=None, out=None): |
| '''enhanced shell command execution. |
| run with environment maybe modified, maybe in different dir. |
| |
| if command fails and onerr is None, return status. if ui object, |
| print error message and return status, else raise onerr object as |
| exception. |
| |
| if out is specified, it is assumed to be a file-like object that has a |
| write() method. stdout and stderr will be redirected to out.''' |
| try: |
| sys.stdout.flush() |
| except Exception: |
| pass |
| def py2shell(val): |
| 'convert python object into string that is useful to shell' |
| if val is None or val is False: |
| return '0' |
| if val is True: |
| return '1' |
| return str(val) |
| origcmd = cmd |
| cmd = quotecommand(cmd) |
| if sys.platform == 'plan9': |
| # subprocess kludge to work around issues in half-baked Python |
| # ports, notably bichued/python: |
| if not cwd is None: |
| os.chdir(cwd) |
| rc = os.system(cmd) |
| else: |
| env = dict(os.environ) |
| env.update((k, py2shell(v)) for k, v in environ.iteritems()) |
| env['HG'] = hgexecutable() |
| if out is None or out == sys.__stdout__: |
| rc = subprocess.call(cmd, shell=True, close_fds=closefds, |
| env=env, cwd=cwd) |
| else: |
| proc = subprocess.Popen(cmd, shell=True, close_fds=closefds, |
| env=env, cwd=cwd, stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT) |
| for line in proc.stdout: |
| out.write(line) |
| proc.wait() |
| rc = proc.returncode |
| if sys.platform == 'OpenVMS' and rc & 1: |
| rc = 0 |
| if rc and onerr: |
| errmsg = '%s %s' % (os.path.basename(origcmd.split(None, 1)[0]), |
| explainexit(rc)[0]) |
| if errprefix: |
| errmsg = '%s: %s' % (errprefix, errmsg) |
| try: |
| onerr.warn(errmsg + '\n') |
| except AttributeError: |
| raise onerr(errmsg) |
| return rc |
| |
| def checksignature(func): |
| '''wrap a function with code to check for calling errors''' |
| def check(*args, **kwargs): |
| try: |
| return func(*args, **kwargs) |
| except TypeError: |
| if len(traceback.extract_tb(sys.exc_info()[2])) == 1: |
| raise error.SignatureError |
| raise |
| |
| return check |
| |
| def copyfile(src, dest): |
| "copy a file, preserving mode and atime/mtime" |
| if os.path.lexists(dest): |
| unlink(dest) |
| if os.path.islink(src): |
| os.symlink(os.readlink(src), dest) |
| else: |
| try: |
| shutil.copyfile(src, dest) |
| shutil.copymode(src, dest) |
| except shutil.Error, inst: |
| raise Abort(str(inst)) |
| |
| def copyfiles(src, dst, hardlink=None): |
| """Copy a directory tree using hardlinks if possible""" |
| |
| if hardlink is None: |
| hardlink = (os.stat(src).st_dev == |
| os.stat(os.path.dirname(dst)).st_dev) |
| |
| num = 0 |
| if os.path.isdir(src): |
| os.mkdir(dst) |
| for name, kind in osutil.listdir(src): |
| srcname = os.path.join(src, name) |
| dstname = os.path.join(dst, name) |
| hardlink, n = copyfiles(srcname, dstname, hardlink) |
| num += n |
| else: |
| if hardlink: |
| try: |
| oslink(src, dst) |
| except (IOError, OSError): |
| hardlink = False |
| shutil.copy(src, dst) |
| else: |
| shutil.copy(src, dst) |
| num += 1 |
| |
| return hardlink, num |
| |
| _winreservednames = '''con prn aux nul |
| com1 com2 com3 com4 com5 com6 com7 com8 com9 |
| lpt1 lpt2 lpt3 lpt4 lpt5 lpt6 lpt7 lpt8 lpt9'''.split() |
| _winreservedchars = ':*?"<>|' |
| def checkwinfilename(path): |
| '''Check that the base-relative path is a valid filename on Windows. |
| Returns None if the path is ok, or a UI string describing the problem. |
| |
| >>> checkwinfilename("just/a/normal/path") |
| >>> checkwinfilename("foo/bar/con.xml") |
| "filename contains 'con', which is reserved on Windows" |
| >>> checkwinfilename("foo/con.xml/bar") |
| "filename contains 'con', which is reserved on Windows" |
| >>> checkwinfilename("foo/bar/xml.con") |
| >>> checkwinfilename("foo/bar/AUX/bla.txt") |
| "filename contains 'AUX', which is reserved on Windows" |
| >>> checkwinfilename("foo/bar/bla:.txt") |
| "filename contains ':', which is reserved on Windows" |
| >>> checkwinfilename("foo/bar/b\07la.txt") |
| "filename contains '\\\\x07', which is invalid on Windows" |
| >>> checkwinfilename("foo/bar/bla ") |
| "filename ends with ' ', which is not allowed on Windows" |
| >>> checkwinfilename("../bar") |
| ''' |
| for n in path.replace('\\', '/').split('/'): |
| if not n: |
| continue |
| for c in n: |
| if c in _winreservedchars: |
| return _("filename contains '%s', which is reserved " |
| "on Windows") % c |
| if ord(c) <= 31: |
| return _("filename contains %r, which is invalid " |
| "on Windows") % c |
| base = n.split('.')[0] |
| if base and base.lower() in _winreservednames: |
| return _("filename contains '%s', which is reserved " |
| "on Windows") % base |
| t = n[-1] |
| if t in '. ' and n not in '..': |
| return _("filename ends with '%s', which is not allowed " |
| "on Windows") % t |
| |
| if os.name == 'nt': |
| checkosfilename = checkwinfilename |
| else: |
| checkosfilename = platform.checkosfilename |
| |
| def makelock(info, pathname): |
| try: |
| return os.symlink(info, pathname) |
| except OSError, why: |
| if why.errno == errno.EEXIST: |
| raise |
| except AttributeError: # no symlink in os |
| pass |
| |
| ld = os.open(pathname, os.O_CREAT | os.O_WRONLY | os.O_EXCL) |
| os.write(ld, info) |
| os.close(ld) |
| |
| def readlock(pathname): |
| try: |
| return os.readlink(pathname) |
| except OSError, why: |
| if why.errno not in (errno.EINVAL, errno.ENOSYS): |
| raise |
| except AttributeError: # no symlink in os |
| pass |
| fp = posixfile(pathname) |
| r = fp.read() |
| fp.close() |
| return r |
| |
| def fstat(fp): |
| '''stat file object that may not have fileno method.''' |
| try: |
| return os.fstat(fp.fileno()) |
| except AttributeError: |
| return os.stat(fp.name) |
| |
| # File system features |
| |
| def checkcase(path): |
| """ |
| Return true if the given path is on a case-sensitive filesystem |
| |
| Requires a path (like /foo/.hg) ending with a foldable final |
| directory component. |
| """ |
| s1 = os.stat(path) |
| d, b = os.path.split(path) |
| b2 = b.upper() |
| if b == b2: |
| b2 = b.lower() |
| if b == b2: |
| return True # no evidence against case sensitivity |
| p2 = os.path.join(d, b2) |
| try: |
| s2 = os.stat(p2) |
| if s2 == s1: |
| return False |
| return True |
| except OSError: |
| return True |
| |
| try: |
| import re2 |
| _re2 = None |
| except ImportError: |
| _re2 = False |
| |
| def compilere(pat, flags=0): |
| '''Compile a regular expression, using re2 if possible |
| |
| For best performance, use only re2-compatible regexp features. The |
| only flags from the re module that are re2-compatible are |
| IGNORECASE and MULTILINE.''' |
| global _re2 |
| if _re2 is None: |
| try: |
| re2.compile |
| _re2 = True |
| except ImportError: |
| _re2 = False |
| if _re2 and (flags & ~(re.IGNORECASE | re.MULTILINE)) == 0: |
| if flags & re.IGNORECASE: |
| pat = '(?i)' + pat |
| if flags & re.MULTILINE: |
| pat = '(?m)' + pat |
| try: |
| return re2.compile(pat) |
| except re2.error: |
| pass |
| return re.compile(pat, flags) |
| |
| _fspathcache = {} |
| def fspath(name, root): |
| '''Get name in the case stored in the filesystem |
| |
| The name should be relative to root, and be normcase-ed for efficiency. |
| |
| Note that this function is unnecessary, and should not be |
| called, for case-sensitive filesystems (simply because it's expensive). |
| |
| The root should be normcase-ed, too. |
| ''' |
| def find(p, contents): |
| for n in contents: |
| if normcase(n) == p: |
| return n |
| return None |
| |
| seps = os.sep |
| if os.altsep: |
| seps = seps + os.altsep |
| # Protect backslashes. This gets silly very quickly. |
| seps.replace('\\','\\\\') |
| pattern = re.compile(r'([^%s]+)|([%s]+)' % (seps, seps)) |
| dir = os.path.normpath(root) |
| result = [] |
| for part, sep in pattern.findall(name): |
| if sep: |
| result.append(sep) |
| continue |
| |
| if dir not in _fspathcache: |
| _fspathcache[dir] = os.listdir(dir) |
| contents = _fspathcache[dir] |
| |
| found = find(part, contents) |
| if not found: |
| # retry "once per directory" per "dirstate.walk" which |
| # may take place for each patches of "hg qpush", for example |
| contents = os.listdir(dir) |
| _fspathcache[dir] = contents |
| found = find(part, contents) |
| |
| result.append(found or part) |
| dir = os.path.join(dir, part) |
| |
| return ''.join(result) |
| |
| def checknlink(testfile): |
| '''check whether hardlink count reporting works properly''' |
| |
| # testfile may be open, so we need a separate file for checking to |
| # work around issue2543 (or testfile may get lost on Samba shares) |
| f1 = testfile + ".hgtmp1" |
| if os.path.lexists(f1): |
| return False |
| try: |
| posixfile(f1, 'w').close() |
| except IOError: |
| return False |
| |
| f2 = testfile + ".hgtmp2" |
| fd = None |
| try: |
| try: |
| oslink(f1, f2) |
| except OSError: |
| return False |
| |
| # nlinks() may behave differently for files on Windows shares if |
| # the file is open. |
| fd = posixfile(f2) |
| return nlinks(f2) > 1 |
| finally: |
| if fd is not None: |
| fd.close() |
| for f in (f1, f2): |
| try: |
| os.unlink(f) |
| except OSError: |
| pass |
| |
| def endswithsep(path): |
| '''Check path ends with os.sep or os.altsep.''' |
| return path.endswith(os.sep) or os.altsep and path.endswith(os.altsep) |
| |
| def splitpath(path): |
| '''Split path by os.sep. |
| Note that this function does not use os.altsep because this is |
| an alternative of simple "xxx.split(os.sep)". |
| It is recommended to use os.path.normpath() before using this |
| function if need.''' |
| return path.split(os.sep) |
| |
| def gui(): |
| '''Are we running in a GUI?''' |
| if sys.platform == 'darwin': |
| if 'SSH_CONNECTION' in os.environ: |
| # handle SSH access to a box where the user is logged in |
| return False |
| elif getattr(osutil, 'isgui', None): |
| # check if a CoreGraphics session is available |
| return osutil.isgui() |
| else: |
| # pure build; use a safe default |
| return True |
| else: |
| return os.name == "nt" or os.environ.get("DISPLAY") |
| |
| def mktempcopy(name, emptyok=False, createmode=None): |
| """Create a temporary file with the same contents from name |
| |
| The permission bits are copied from the original file. |
| |
| If the temporary file is going to be truncated immediately, you |
| can use emptyok=True as an optimization. |
| |
| Returns the name of the temporary file. |
| """ |
| d, fn = os.path.split(name) |
| fd, temp = tempfile.mkstemp(prefix='.%s-' % fn, dir=d) |
| os.close(fd) |
| # Temporary files are created with mode 0600, which is usually not |
| # what we want. If the original file already exists, just copy |
| # its mode. Otherwise, manually obey umask. |
| copymode(name, temp, createmode) |
| if emptyok: |
| return temp |
| try: |
| try: |
| ifp = posixfile(name, "rb") |
| except IOError, inst: |
| if inst.errno == errno.ENOENT: |
| return temp |
| if not getattr(inst, 'filename', None): |
| inst.filename = name |
| raise |
| ofp = posixfile(temp, "wb") |
| for chunk in filechunkiter(ifp): |
| ofp.write(chunk) |
| ifp.close() |
| ofp.close() |
| except: # re-raises |
| try: os.unlink(temp) |
| except OSError: pass |
| raise |
| return temp |
| |
| class atomictempfile(object): |
| '''writable file object that atomically updates a file |
| |
| All writes will go to a temporary copy of the original file. Call |
| close() when you are done writing, and atomictempfile will rename |
| the temporary copy to the original name, making the changes |
| visible. If the object is destroyed without being closed, all your |
| writes are discarded. |
| ''' |
| def __init__(self, name, mode='w+b', createmode=None): |
| self.__name = name # permanent name |
| self._tempname = mktempcopy(name, emptyok=('w' in mode), |
| createmode=createmode) |
| self._fp = posixfile(self._tempname, mode) |
| |
| # delegated methods |
| self.write = self._fp.write |
| self.seek = self._fp.seek |
| self.tell = self._fp.tell |
| self.fileno = self._fp.fileno |
| |
| def close(self): |
| if not self._fp.closed: |
| self._fp.close() |
| rename(self._tempname, localpath(self.__name)) |
| |
| def discard(self): |
| if not self._fp.closed: |
| try: |
| os.unlink(self._tempname) |
| except OSError: |
| pass |
| self._fp.close() |
| |
| def __del__(self): |
| if safehasattr(self, '_fp'): # constructor actually did something |
| self.discard() |
| |
| def makedirs(name, mode=None, notindexed=False): |
| """recursive directory creation with parent mode inheritance""" |
| try: |
| makedir(name, notindexed) |
| except OSError, err: |
| if err.errno == errno.EEXIST: |
| return |
| if err.errno != errno.ENOENT or not name: |
| raise |
| parent = os.path.dirname(os.path.abspath(name)) |
| if parent == name: |
| raise |
| makedirs(parent, mode, notindexed) |
| makedir(name, notindexed) |
| if mode is not None: |
| os.chmod(name, mode) |
| |
| def ensuredirs(name, mode=None): |
| """race-safe recursive directory creation""" |
| if os.path.isdir(name): |
| return |
| parent = os.path.dirname(os.path.abspath(name)) |
| if parent != name: |
| ensuredirs(parent, mode) |
| try: |
| os.mkdir(name) |
| except OSError, err: |
| if err.errno == errno.EEXIST and os.path.isdir(name): |
| # someone else seems to have won a directory creation race |
| return |
| raise |
| if mode is not None: |
| os.chmod(name, mode) |
| |
| def readfile(path): |
| fp = open(path, 'rb') |
| try: |
| return fp.read() |
| finally: |
| fp.close() |
| |
| def writefile(path, text): |
| fp = open(path, 'wb') |
| try: |
| fp.write(text) |
| finally: |
| fp.close() |
| |
| def appendfile(path, text): |
| fp = open(path, 'ab') |
| try: |
| fp.write(text) |
| finally: |
| fp.close() |
| |
| class chunkbuffer(object): |
| """Allow arbitrary sized chunks of data to be efficiently read from an |
| iterator over chunks of arbitrary size.""" |
| |
| def __init__(self, in_iter): |
| """in_iter is the iterator that's iterating over the input chunks. |
| targetsize is how big a buffer to try to maintain.""" |
| def splitbig(chunks): |
| for chunk in chunks: |
| if len(chunk) > 2**20: |
| pos = 0 |
| while pos < len(chunk): |
| end = pos + 2 ** 18 |
| yield chunk[pos:end] |
| pos = end |
| else: |
| yield chunk |
| self.iter = splitbig(in_iter) |
| self._queue = deque() |
| |
| def read(self, l): |
| """Read L bytes of data from the iterator of chunks of data. |
| Returns less than L bytes if the iterator runs dry.""" |
| left = l |
| buf = [] |
| queue = self._queue |
| while left > 0: |
| # refill the queue |
| if not queue: |
| target = 2**18 |
| for chunk in self.iter: |
| queue.append(chunk) |
| target -= len(chunk) |
| if target <= 0: |
| break |
| if not queue: |
| break |
| |
| chunk = queue.popleft() |
| left -= len(chunk) |
| if left < 0: |
| queue.appendleft(chunk[left:]) |
| buf.append(chunk[:left]) |
| else: |
| buf.append(chunk) |
| |
| return ''.join(buf) |
| |
| def filechunkiter(f, size=65536, limit=None): |
| """Create a generator that produces the data in the file size |
| (default 65536) bytes at a time, up to optional limit (default is |
| to read all data). Chunks may be less than size bytes if the |
| chunk is the last chunk in the file, or the file is a socket or |
| some other type of file that sometimes reads less data than is |
| requested.""" |
| assert size >= 0 |
| assert limit is None or limit >= 0 |
| while True: |
| if limit is None: |
| nbytes = size |
| else: |
| nbytes = min(limit, size) |
| s = nbytes and f.read(nbytes) |
| if not s: |
| break |
| if limit: |
| limit -= len(s) |
| yield s |
| |
| def makedate(): |
| ct = time.time() |
| if ct < 0: |
| hint = _("check your clock") |
| raise Abort(_("negative timestamp: %d") % ct, hint=hint) |
| delta = (datetime.datetime.utcfromtimestamp(ct) - |
| datetime.datetime.fromtimestamp(ct)) |
| tz = delta.days * 86400 + delta.seconds |
| return ct, tz |
| |
| def datestr(date=None, format='%a %b %d %H:%M:%S %Y %1%2'): |
| """represent a (unixtime, offset) tuple as a localized time. |
| unixtime is seconds since the epoch, and offset is the time zone's |
| number of seconds away from UTC. if timezone is false, do not |
| append time zone to string.""" |
| t, tz = date or makedate() |
| if t < 0: |
| t = 0 # time.gmtime(lt) fails on Windows for lt < -43200 |
| tz = 0 |
| if "%1" in format or "%2" in format: |
| sign = (tz > 0) and "-" or "+" |
| minutes = abs(tz) // 60 |
| format = format.replace("%1", "%c%02d" % (sign, minutes // 60)) |
| format = format.replace("%2", "%02d" % (minutes % 60)) |
| try: |
| t = time.gmtime(float(t) - tz) |
| except ValueError: |
| # time was out of range |
| t = time.gmtime(sys.maxint) |
| s = time.strftime(format, t) |
| return s |
| |
| def shortdate(date=None): |
| """turn (timestamp, tzoff) tuple into iso 8631 date.""" |
| return datestr(date, format='%Y-%m-%d') |
| |
| def strdate(string, format, defaults=[]): |
| """parse a localized time string and return a (unixtime, offset) tuple. |
| if the string cannot be parsed, ValueError is raised.""" |
| def timezone(string): |
| tz = string.split()[-1] |
| if tz[0] in "+-" and len(tz) == 5 and tz[1:].isdigit(): |
| sign = (tz[0] == "+") and 1 or -1 |
| hours = int(tz[1:3]) |
| minutes = int(tz[3:5]) |
| return -sign * (hours * 60 + minutes) * 60 |
| if tz == "GMT" or tz == "UTC": |
| return 0 |
| return None |
| |
| # NOTE: unixtime = localunixtime + offset |
| offset, date = timezone(string), string |
| if offset is not None: |
| date = " ".join(string.split()[:-1]) |
| |
| # add missing elements from defaults |
| usenow = False # default to using biased defaults |
| for part in ("S", "M", "HI", "d", "mb", "yY"): # decreasing specificity |
| found = [True for p in part if ("%"+p) in format] |
| if not found: |
| date += "@" + defaults[part][usenow] |
| format += "@%" + part[0] |
| else: |
| # We've found a specific time element, less specific time |
| # elements are relative to today |
| usenow = True |
| |
| timetuple = time.strptime(date, format) |
| localunixtime = int(calendar.timegm(timetuple)) |
| if offset is None: |
| # local timezone |
| unixtime = int(time.mktime(timetuple)) |
| offset = unixtime - localunixtime |
| else: |
| unixtime = localunixtime + offset |
| return unixtime, offset |
| |
| def parsedate(date, formats=None, bias={}): |
| """parse a localized date/time and return a (unixtime, offset) tuple. |
| |
| The date may be a "unixtime offset" string or in one of the specified |
| formats. If the date already is a (unixtime, offset) tuple, it is returned. |
| |
| >>> parsedate(' today ') == parsedate(\ |
| datetime.date.today().strftime('%b %d')) |
| True |
| >>> parsedate( 'yesterday ') == parsedate((datetime.date.today() -\ |
| datetime.timedelta(days=1)\ |
| ).strftime('%b %d')) |
| True |
| >>> now, tz = makedate() |
| >>> strnow, strtz = parsedate('now') |
| >>> (strnow - now) < 1 |
| True |
| >>> tz == strtz |
| True |
| """ |
| if not date: |
| return 0, 0 |
| if isinstance(date, tuple) and len(date) == 2: |
| return date |
| if not formats: |
| formats = defaultdateformats |
| date = date.strip() |
| |
| if date == _('now'): |
| return makedate() |
| if date == _('today'): |
| date = datetime.date.today().strftime('%b %d') |
| elif date == _('yesterday'): |
| date = (datetime.date.today() - |
| datetime.timedelta(days=1)).strftime('%b %d') |
| |
| try: |
| when, offset = map(int, date.split(' ')) |
| except ValueError: |
| # fill out defaults |
| now = makedate() |
| defaults = {} |
| for part in ("d", "mb", "yY", "HI", "M", "S"): |
| # this piece is for rounding the specific end of unknowns |
| b = bias.get(part) |
| if b is None: |
| if part[0] in "HMS": |
| b = "00" |
| else: |
| b = "0" |
| |
| # this piece is for matching the generic end to today's date |
| n = datestr(now, "%" + part[0]) |
| |
| defaults[part] = (b, n) |
| |
| for format in formats: |
| try: |
| when, offset = strdate(date, format, defaults) |
| except (ValueError, OverflowError): |
| pass |
| else: |
| break |
| else: |
| raise Abort(_('invalid date: %r') % date) |
| # validate explicit (probably user-specified) date and |
| # time zone offset. values must fit in signed 32 bits for |
| # current 32-bit linux runtimes. timezones go from UTC-12 |
| # to UTC+14 |
| if abs(when) > 0x7fffffff: |
| raise Abort(_('date exceeds 32 bits: %d') % when) |
| if when < 0: |
| raise Abort(_('negative date value: %d') % when) |
| if offset < -50400 or offset > 43200: |
| raise Abort(_('impossible time zone offset: %d') % offset) |
| return when, offset |
| |
| def matchdate(date): |
| """Return a function that matches a given date match specifier |
| |
| Formats include: |
| |
| '{date}' match a given date to the accuracy provided |
| |
| '<{date}' on or before a given date |
| |
| '>{date}' on or after a given date |
| |
| >>> p1 = parsedate("10:29:59") |
| >>> p2 = parsedate("10:30:00") |
| >>> p3 = parsedate("10:30:59") |
| >>> p4 = parsedate("10:31:00") |
| >>> p5 = parsedate("Sep 15 10:30:00 1999") |
| >>> f = matchdate("10:30") |
| >>> f(p1[0]) |
| False |
| >>> f(p2[0]) |
| True |
| >>> f(p3[0]) |
| True |
| >>> f(p4[0]) |
| False |
| >>> f(p5[0]) |
| False |
| """ |
| |
| def lower(date): |
| d = dict(mb="1", d="1") |
| return parsedate(date, extendeddateformats, d)[0] |
| |
| def upper(date): |
| d = dict(mb="12", HI="23", M="59", S="59") |
| for days in ("31", "30", "29"): |
| try: |
| d["d"] = days |
| return parsedate(date, extendeddateformats, d)[0] |
| except Abort: |
| pass |
| d["d"] = "28" |
| return parsedate(date, extendeddateformats, d)[0] |
| |
| date = date.strip() |
| |
| if not date: |
| raise Abort(_("dates cannot consist entirely of whitespace")) |
| elif date[0] == "<": |
| if not date[1:]: |
| raise Abort(_("invalid day spec, use '<DATE'")) |
| when = upper(date[1:]) |
| return lambda x: x <= when |
| elif date[0] == ">": |
| if not date[1:]: |
| raise Abort(_("invalid day spec, use '>DATE'")) |
| when = lower(date[1:]) |
| return lambda x: x >= when |
| elif date[0] == "-": |
| try: |
| days = int(date[1:]) |
| except ValueError: |
| raise Abort(_("invalid day spec: %s") % date[1:]) |
| if days < 0: |
| raise Abort(_("%s must be nonnegative (see 'hg help dates')") |
| % date[1:]) |
| when = makedate()[0] - days * 3600 * 24 |
| return lambda x: x >= when |
| elif " to " in date: |
| a, b = date.split(" to ") |
| start, stop = lower(a), upper(b) |
| return lambda x: x >= start and x <= stop |
| else: |
| start, stop = lower(date), upper(date) |
| return lambda x: x >= start and x <= stop |
| |
| def shortuser(user): |
| """Return a short representation of a user name or email address.""" |
| f = user.find('@') |
| if f >= 0: |
| user = user[:f] |
| f = user.find('<') |
| if f >= 0: |
| user = user[f + 1:] |
| f = user.find(' ') |
| if f >= 0: |
| user = user[:f] |
| f = user.find('.') |
| if f >= 0: |
| user = user[:f] |
| return user |
| |
| def emailuser(user): |
| """Return the user portion of an email address.""" |
| f = user.find('@') |
| if f >= 0: |
| user = user[:f] |
| f = user.find('<') |
| if f >= 0: |
| user = user[f + 1:] |
| return user |
| |
| def email(author): |
| '''get email of author.''' |
| r = author.find('>') |
| if r == -1: |
| r = None |
| return author[author.find('<') + 1:r] |
| |
| def _ellipsis(text, maxlength): |
| if len(text) <= maxlength: |
| return text, False |
| else: |
| return "%s..." % (text[:maxlength - 3]), True |
| |
| def ellipsis(text, maxlength=400): |
| """Trim string to at most maxlength (default: 400) characters.""" |
| try: |
| # use unicode not to split at intermediate multi-byte sequence |
| utext, truncated = _ellipsis(text.decode(encoding.encoding), |
| maxlength) |
| if not truncated: |
| return text |
| return utext.encode(encoding.encoding) |
| except (UnicodeDecodeError, UnicodeEncodeError): |
| return _ellipsis(text, maxlength)[0] |
| |
| def unitcountfn(*unittable): |
| '''return a function that renders a readable count of some quantity''' |
| |
| def go(count): |
| for multiplier, divisor, format in unittable: |
| if count >= divisor * multiplier: |
| return format % (count / float(divisor)) |
| return unittable[-1][2] % count |
| |
| return go |
| |
| bytecount = unitcountfn( |
| (100, 1 << 30, _('%.0f GB')), |
| (10, 1 << 30, _('%.1f GB')), |
| (1, 1 << 30, _('%.2f GB')), |
| (100, 1 << 20, _('%.0f MB')), |
| (10, 1 << 20, _('%.1f MB')), |
| (1, 1 << 20, _('%.2f MB')), |
| (100, 1 << 10, _('%.0f KB')), |
| (10, 1 << 10, _('%.1f KB')), |
| (1, 1 << 10, _('%.2f KB')), |
| (1, 1, _('%.0f bytes')), |
| ) |
| |
| def uirepr(s): |
| # Avoid double backslash in Windows path repr() |
| return repr(s).replace('\\\\', '\\') |
| |
| # delay import of textwrap |
| def MBTextWrapper(**kwargs): |
| class tw(textwrap.TextWrapper): |
| """ |
| Extend TextWrapper for width-awareness. |
| |
| Neither number of 'bytes' in any encoding nor 'characters' is |
| appropriate to calculate terminal columns for specified string. |
| |
| Original TextWrapper implementation uses built-in 'len()' directly, |
| so overriding is needed to use width information of each characters. |
| |
| In addition, characters classified into 'ambiguous' width are |
| treated as wide in East Asian area, but as narrow in other. |
| |
| This requires use decision to determine width of such characters. |
| """ |
| def __init__(self, **kwargs): |
| textwrap.TextWrapper.__init__(self, **kwargs) |
| |
| # for compatibility between 2.4 and 2.6 |
| if getattr(self, 'drop_whitespace', None) is None: |
| self.drop_whitespace = kwargs.get('drop_whitespace', True) |
| |
| def _cutdown(self, ucstr, space_left): |
| l = 0 |
| colwidth = encoding.ucolwidth |
| for i in xrange(len(ucstr)): |
| l += colwidth(ucstr[i]) |
| if space_left < l: |
| return (ucstr[:i], ucstr[i:]) |
| return ucstr, '' |
| |
| # overriding of base class |
| def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width): |
| space_left = max(width - cur_len, 1) |
| |
| if self.break_long_words: |
| cut, res = self._cutdown(reversed_chunks[-1], space_left) |
| cur_line.append(cut) |
| reversed_chunks[-1] = res |
| elif not cur_line: |
| cur_line.append(reversed_chunks.pop()) |
| |
| # this overriding code is imported from TextWrapper of python 2.6 |
| # to calculate columns of string by 'encoding.ucolwidth()' |
| def _wrap_chunks(self, chunks): |
| colwidth = encoding.ucolwidth |
| |
| lines = [] |
| if self.width <= 0: |
| raise ValueError("invalid width %r (must be > 0)" % self.width) |
| |
| # Arrange in reverse order so items can be efficiently popped |
| # from a stack of chucks. |
| chunks.reverse() |
| |
| while chunks: |
| |
| # Start the list of chunks that will make up the current line. |
| # cur_len is just the length of all the chunks in cur_line. |
| cur_line = [] |
| cur_len = 0 |
| |
| # Figure out which static string will prefix this line. |
| if lines: |
| indent = self.subsequent_indent |
| else: |
| indent = self.initial_indent |
| |
| # Maximum width for this line. |
| width = self.width - len(indent) |
| |
| # First chunk on line is whitespace -- drop it, unless this |
| # is the very beginning of the text (i.e. no lines started yet). |
| if self.drop_whitespace and chunks[-1].strip() == '' and lines: |
| del chunks[-1] |
| |
| while chunks: |
| l = colwidth(chunks[-1]) |
| |
| # Can at least squeeze this chunk onto the current line. |
| if cur_len + l <= width: |
| cur_line.append(chunks.pop()) |
| cur_len += l |
| |
| # Nope, this line is full. |
| else: |
| break |
| |
| # The current line is full, and the next chunk is too big to |
| # fit on *any* line (not just this one). |
| if chunks and colwidth(chunks[-1]) > width: |
| self._handle_long_word(chunks, cur_line, cur_len, width) |
| |
| # If the last chunk on this line is all whitespace, drop it. |
| if (self.drop_whitespace and |
| cur_line and cur_line[-1].strip() == ''): |
| del cur_line[-1] |
| |
| # Convert current line back to a string and store it in list |
| # of all lines (return value). |
| if cur_line: |
| lines.append(indent + ''.join(cur_line)) |
| |
| return lines |
| |
| global MBTextWrapper |
| MBTextWrapper = tw |
| return tw(**kwargs) |
| |
| def wrap(line, width, initindent='', hangindent=''): |
| maxindent = max(len(hangindent), len(initindent)) |
| if width <= maxindent: |
| # adjust for weird terminal size |
| width = max(78, maxindent + 1) |
| line = line.decode(encoding.encoding, encoding.encodingmode) |
| initindent = initindent.decode(encoding.encoding, encoding.encodingmode) |
| hangindent = hangindent.decode(encoding.encoding, encoding.encodingmode) |
| wrapper = MBTextWrapper(width=width, |
| initial_indent=initindent, |
| subsequent_indent=hangindent) |
| return wrapper.fill(line).encode(encoding.encoding) |
| |
| def iterlines(iterator): |
| for chunk in iterator: |
| for line in chunk.splitlines(): |
| yield line |
| |
| def expandpath(path): |
| return os.path.expanduser(os.path.expandvars(path)) |
| |
| def hgcmd(): |
| """Return the command used to execute current hg |
| |
| This is different from hgexecutable() because on Windows we want |
| to avoid things opening new shell windows like batch files, so we |
| get either the python call or current executable. |
| """ |
| if mainfrozen(): |
| return [sys.executable] |
| return gethgcmd() |
| |
| def rundetached(args, condfn): |
| """Execute the argument list in a detached process. |
| |
| condfn is a callable which is called repeatedly and should return |
| True once the child process is known to have started successfully. |
| At this point, the child process PID is returned. If the child |
| process fails to start or finishes before condfn() evaluates to |
| True, return -1. |
| """ |
| # Windows case is easier because the child process is either |
| # successfully starting and validating the condition or exiting |
| # on failure. We just poll on its PID. On Unix, if the child |
| # process fails to start, it will be left in a zombie state until |
| # the parent wait on it, which we cannot do since we expect a long |
| # running process on success. Instead we listen for SIGCHLD telling |
| # us our child process terminated. |
| terminated = set() |
| def handler(signum, frame): |
| terminated.add(os.wait()) |
| prevhandler = None |
| SIGCHLD = getattr(signal, 'SIGCHLD', None) |
| if SIGCHLD is not None: |
| prevhandler = signal.signal(SIGCHLD, handler) |
| try: |
| pid = spawndetached(args) |
| while not condfn(): |
| if ((pid in terminated or not testpid(pid)) |
| and not condfn()): |
| return -1 |
| time.sleep(0.1) |
| return pid |
| finally: |
| if prevhandler is not None: |
| signal.signal(signal.SIGCHLD, prevhandler) |
| |
| try: |
| any, all = any, all |
| except NameError: |
| def any(iterable): |
| for i in iterable: |
| if i: |
| return True |
| return False |
| |
| def all(iterable): |
| for i in iterable: |
| if not i: |
| return False |
| return True |
| |
| def interpolate(prefix, mapping, s, fn=None, escape_prefix=False): |
| """Return the result of interpolating items in the mapping into string s. |
| |
| prefix is a single character string, or a two character string with |
| a backslash as the first character if the prefix needs to be escaped in |
| a regular expression. |
| |
| fn is an optional function that will be applied to the replacement text |
| just before replacement. |
| |
| escape_prefix is an optional flag that allows using doubled prefix for |
| its escaping. |
| """ |
| fn = fn or (lambda s: s) |
| patterns = '|'.join(mapping.keys()) |
| if escape_prefix: |
| patterns += '|' + prefix |
| if len(prefix) > 1: |
| prefix_char = prefix[1:] |
| else: |
| prefix_char = prefix |
| mapping[prefix_char] = prefix_char |
| r = re.compile(r'%s(%s)' % (prefix, patterns)) |
| return r.sub(lambda x: fn(mapping[x.group()[1:]]), s) |
| |
| def getport(port): |
| """Return the port for a given network service. |
| |
| If port is an integer, it's returned as is. If it's a string, it's |
| looked up using socket.getservbyname(). If there's no matching |
| service, util.Abort is raised. |
| """ |
| try: |
| return int(port) |
| except ValueError: |
| pass |
| |
| try: |
| return socket.getservbyname(port) |
| except socket.error: |
| raise Abort(_("no port number associated with service '%s'") % port) |
| |
| _booleans = {'1': True, 'yes': True, 'true': True, 'on': True, 'always': True, |
| '0': False, 'no': False, 'false': False, 'off': False, |
| 'never': False} |
| |
| def parsebool(s): |
| """Parse s into a boolean. |
| |
| If s is not a valid boolean, returns None. |
| """ |
| return _booleans.get(s.lower(), None) |
| |
| _hexdig = '0123456789ABCDEFabcdef' |
| _hextochr = dict((a + b, chr(int(a + b, 16))) |
| for a in _hexdig for b in _hexdig) |
| |
| def _urlunquote(s): |
| """Decode HTTP/HTML % encoding. |
| |
| >>> _urlunquote('abc%20def') |
| 'abc def' |
| """ |
| res = s.split('%') |
| # fastpath |
| if len(res) == 1: |
| return s |
| s = res[0] |
| for item in res[1:]: |
| try: |
| s += _hextochr[item[:2]] + item[2:] |
| except KeyError: |
| s += '%' + item |
| except UnicodeDecodeError: |
| s += unichr(int(item[:2], 16)) + item[2:] |
| return s |
| |
| class url(object): |
| r"""Reliable URL parser. |
| |
| This parses URLs and provides attributes for the following |
| components: |
| |
| <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment> |
| |
| Missing components are set to None. The only exception is |
| fragment, which is set to '' if present but empty. |
| |
| If parsefragment is False, fragment is included in query. If |
| parsequery is False, query is included in path. If both are |
| False, both fragment and query are included in path. |
| |
| See http://www.ietf.org/rfc/rfc2396.txt for more information. |
| |
| Note that for backward compatibility reasons, bundle URLs do not |
| take host names. That means 'bundle://../' has a path of '../'. |
| |
| Examples: |
| |
| >>> url('http://www.ietf.org/rfc/rfc2396.txt') |
| <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'> |
| >>> url('ssh://[::1]:2200//home/joe/repo') |
| <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'> |
| >>> url('file:///home/joe/repo') |
| <url scheme: 'file', path: '/home/joe/repo'> |
| >>> url('file:///c:/temp/foo/') |
| <url scheme: 'file', path: 'c:/temp/foo/'> |
| >>> url('bundle:foo') |
| <url scheme: 'bundle', path: 'foo'> |
| >>> url('bundle://../foo') |
| <url scheme: 'bundle', path: '../foo'> |
| >>> url(r'c:\foo\bar') |
| <url path: 'c:\\foo\\bar'> |
| >>> url(r'\\blah\blah\blah') |
| <url path: '\\\\blah\\blah\\blah'> |
| >>> url(r'\\blah\blah\blah#baz') |
| <url path: '\\\\blah\\blah\\blah', fragment: 'baz'> |
| |
| Authentication credentials: |
| |
| >>> url('ssh://joe:xyz@x/repo') |
| <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'> |
| >>> url('ssh://joe@x/repo') |
| <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'> |
| |
| Query strings and fragments: |
| |
| >>> url('http://host/a?b#c') |
| <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'> |
| >>> url('http://host/a?b#c', parsequery=False, parsefragment=False) |
| <url scheme: 'http', host: 'host', path: 'a?b#c'> |
| """ |
| |
| _safechars = "!~*'()+" |
| _safepchars = "/!~*'()+:" |
| _matchscheme = re.compile(r'^[a-zA-Z0-9+.\-]+:').match |
| |
| def __init__(self, path, parsequery=True, parsefragment=True): |
| # We slowly chomp away at path until we have only the path left |
| self.scheme = self.user = self.passwd = self.host = None |
| self.port = self.path = self.query = self.fragment = None |
| self._localpath = True |
| self._hostport = '' |
| self._origpath = path |
| |
| if parsefragment and '#' in path: |
| path, self.fragment = path.split('#', 1) |
| if not path: |
| path = None |
| |
| # special case for Windows drive letters and UNC paths |
| if hasdriveletter(path) or path.startswith(r'\\'): |
| self.path = path |
| return |
| |
| # For compatibility reasons, we can't handle bundle paths as |
| # normal URLS |
| if path.startswith('bundle:'): |
| self.scheme = 'bundle' |
| path = path[7:] |
| if path.startswith('//'): |
| path = path[2:] |
| self.path = path |
| return |
| |
| if self._matchscheme(path): |
| parts = path.split(':', 1) |
| if parts[0]: |
| self.scheme, path = parts |
| self._localpath = False |
| |
| if not path: |
| path = None |
| if self._localpath: |
| self.path = '' |
| return |
| else: |
| if self._localpath: |
| self.path = path |
| return |
| |
| if parsequery and '?' in path: |
| path, self.query = path.split('?', 1) |
| if not path: |
| path = None |
| if not self.query: |
| self.query = None |
| |
| # // is required to specify a host/authority |
| if path and path.startswith('//'): |
| parts = path[2:].split('/', 1) |
| if len(parts) > 1: |
| self.host, path = parts |
| else: |
| self.host = parts[0] |
| path = None |
| if not self.host: |
| self.host = None |
| # path of file:///d is /d |
| # path of file:///d:/ is d:/, not /d:/ |
| if path and not hasdriveletter(path): |
| path = '/' + path |
| |
| if self.host and '@' in self.host: |
| self.user, self.host = self.host.rsplit('@', 1) |
| if ':' in self.user: |
| self.user, self.passwd = self.user.split(':', 1) |
| if not self.host: |
| self.host = None |
| |
| # Don't split on colons in IPv6 addresses without ports |
| if (self.host and ':' in self.host and |
| not (self.host.startswith('[') and self.host.endswith(']'))): |
| self._hostport = self.host |
| self.host, self.port = self.host.rsplit(':', 1) |
| if not self.host: |
| self.host = None |
| |
| if (self.host and self.scheme == 'file' and |
| self.host not in ('localhost', '127.0.0.1', '[::1]')): |
| raise Abort(_('file:// URLs can only refer to localhost')) |
| |
| self.path = path |
| |
| # leave the query string escaped |
| for a in ('user', 'passwd', 'host', 'port', |
| 'path', 'fragment'): |
| v = getattr(self, a) |
| if v is not None: |
| setattr(self, a, _urlunquote(v)) |
| |
| def __repr__(self): |
| attrs = [] |
| for a in ('scheme', 'user', 'passwd', 'host', 'port', 'path', |
| 'query', 'fragment'): |
| v = getattr(self, a) |
| if v is not None: |
| attrs.append('%s: %r' % (a, v)) |
| return '<url %s>' % ', '.join(attrs) |
| |
| def __str__(self): |
| r"""Join the URL's components back into a URL string. |
| |
| Examples: |
| |
| >>> str(url('http://user:pw@host:80/c:/bob?fo:oo#ba:ar')) |
| 'http://user:pw@host:80/c:/bob?fo:oo#ba:ar' |
| >>> str(url('http://user:pw@host:80/?foo=bar&baz=42')) |
| 'http://user:pw@host:80/?foo=bar&baz=42' |
| >>> str(url('http://user:pw@host:80/?foo=bar%3dbaz')) |
| 'http://user:pw@host:80/?foo=bar%3dbaz' |
| >>> str(url('ssh://user:pw@[::1]:2200//home/joe#')) |
| 'ssh://user:pw@[::1]:2200//home/joe#' |
| >>> str(url('http://localhost:80//')) |
| 'http://localhost:80//' |
| >>> str(url('http://localhost:80/')) |
| 'http://localhost:80/' |
| >>> str(url('http://localhost:80')) |
| 'http://localhost:80/' |
| >>> str(url('bundle:foo')) |
| 'bundle:foo' |
| >>> str(url('bundle://../foo')) |
| 'bundle:../foo' |
| >>> str(url('path')) |
| 'path' |
| >>> str(url('file:///tmp/foo/bar')) |
| 'file:///tmp/foo/bar' |
| >>> str(url('file:///c:/tmp/foo/bar')) |
| 'file:///c:/tmp/foo/bar' |
| >>> print url(r'bundle:foo\bar') |
| bundle:foo\bar |
| """ |
| if self._localpath: |
| s = self.path |
| if self.scheme == 'bundle': |
| s = 'bundle:' + s |
| if self.fragment: |
| s += '#' + self.fragment |
| return s |
| |
| s = self.scheme + ':' |
| if self.user or self.passwd or self.host: |
| s += '//' |
| elif self.scheme and (not self.path or self.path.startswith('/') |
| or hasdriveletter(self.path)): |
| s += '//' |
| if hasdriveletter(self.path): |
| s += '/' |
| if self.user: |
| s += urllib.quote(self.user, safe=self._safechars) |
| if self.passwd: |
| s += ':' + urllib.quote(self.passwd, safe=self._safechars) |
| if self.user or self.passwd: |
| s += '@' |
| if self.host: |
| if not (self.host.startswith('[') and self.host.endswith(']')): |
| s += urllib.quote(self.host) |
| else: |
| s += self.host |
| if self.port: |
| s += ':' + urllib.quote(self.port) |
| if self.host: |
| s += '/' |
| if self.path: |
| # TODO: similar to the query string, we should not unescape the |
| # path when we store it, the path might contain '%2f' = '/', |
| # which we should *not* escape. |
| s += urllib.quote(self.path, safe=self._safepchars) |
| if self.query: |
| # we store the query in escaped form. |
| s += '?' + self.query |
| if self.fragment is not None: |
| s += '#' + urllib.quote(self.fragment, safe=self._safepchars) |
| return s |
| |
| def authinfo(self): |
| user, passwd = self.user, self.passwd |
| try: |
| self.user, self.passwd = None, None |
| s = str(self) |
| finally: |
| self.user, self.passwd = user, passwd |
| if not self.user: |
| return (s, None) |
| # authinfo[1] is passed to urllib2 password manager, and its |
| # URIs must not contain credentials. The host is passed in the |
| # URIs list because Python < 2.4.3 uses only that to search for |
| # a password. |
| return (s, (None, (s, self.host), |
| self.user, self.passwd or '')) |
| |
| def isabs(self): |
| if self.scheme and self.scheme != 'file': |
| return True # remote URL |
| if hasdriveletter(self.path): |
| return True # absolute for our purposes - can't be joined() |
| if self.path.startswith(r'\\'): |
| return True # Windows UNC path |
| if self.path.startswith('/'): |
| return True # POSIX-style |
| return False |
| |
| def localpath(self): |
| if self.scheme == 'file' or self.scheme == 'bundle': |
| path = self.path or '/' |
| # For Windows, we need to promote hosts containing drive |
| # letters to paths with drive letters. |
| if hasdriveletter(self._hostport): |
| path = self._hostport + '/' + self.path |
| elif (self.host is not None and self.path |
| and not hasdriveletter(path)): |
| path = '/' + path |
| return path |
| return self._origpath |
| |
| def hasscheme(path): |
| return bool(url(path).scheme) |
| |
| def hasdriveletter(path): |
| return path and path[1:2] == ':' and path[0:1].isalpha() |
| |
| def urllocalpath(path): |
| return url(path, parsequery=False, parsefragment=False).localpath() |
| |
| def hidepassword(u): |
| '''hide user credential in a url string''' |
| u = url(u) |
| if u.passwd: |
| u.passwd = '***' |
| return str(u) |
| |
| def removeauth(u): |
| '''remove all authentication information from a url string''' |
| u = url(u) |
| u.user = u.passwd = None |
| return str(u) |
| |
| def isatty(fd): |
| try: |
| return fd.isatty() |
| except AttributeError: |
| return False |
| |
| timecount = unitcountfn( |
| (1, 1e3, _('%.0f s')), |
| (100, 1, _('%.1f s')), |
| (10, 1, _('%.2f s')), |
| (1, 1, _('%.3f s')), |
| (100, 0.001, _('%.1f ms')), |
| (10, 0.001, _('%.2f ms')), |
| (1, 0.001, _('%.3f ms')), |
| (100, 0.000001, _('%.1f us')), |
| (10, 0.000001, _('%.2f us')), |
| (1, 0.000001, _('%.3f us')), |
| (100, 0.000000001, _('%.1f ns')), |
| (10, 0.000000001, _('%.2f ns')), |
| (1, 0.000000001, _('%.3f ns')), |
| ) |
| |
| _timenesting = [0] |
| |
| def timed(func): |
| '''Report the execution time of a function call to stderr. |
| |
| During development, use as a decorator when you need to measure |
| the cost of a function, e.g. as follows: |
| |
| @util.timed |
| def foo(a, b, c): |
| pass |
| ''' |
| |
| def wrapper(*args, **kwargs): |
| start = time.time() |
| indent = 2 |
| _timenesting[0] += indent |
| try: |
| return func(*args, **kwargs) |
| finally: |
| elapsed = time.time() - start |
| _timenesting[0] -= indent |
| sys.stderr.write('%s%s: %s\n' % |
| (' ' * _timenesting[0], func.__name__, |
| timecount(elapsed))) |
| return wrapper |