scmutil: support background file closing
authorGregory Szorc <gregory.szorc@gmail.com>
Thu, 14 Jan 2016 13:34:59 -0800
changeset 27895 2d6a89e79b48
parent 27894 a94f7eef3199
child 27896 1d29893240cc
scmutil: support background file closing Closing files that have been appended to is relatively slow on Windows/NTFS. This makes several Mercurial operations slower on Windows. The workaround to this issue is conceptually simple: use multiple threads for I/O. Unfortunately, Python doesn't scale well to multiple threads because of the GIL. And, refactoring our code to use threads everywhere would be a huge undertaking. So, we decide to tackle this problem by starting small: establishing a thread pool for closing files. This patch establishes a mechanism for closing file handles on separate threads. The coordinator object is basically a queue of file handles to operate on and a thread pool consuming from the queue. When files are opened through the VFS layer, the caller can specify that delay closing is allowed. A proxy class for file handles has been added. We must use a proxy because it isn't possible to modify __class__ on built-in types. This adds some overhead. But as future patches will show, this overhead is cancelled out by the benefit of closing file handles on background threads.
mercurial/help/config.txt
mercurial/scmutil.py
--- a/mercurial/help/config.txt	Tue Jan 12 23:56:48 2016 +0900
+++ b/mercurial/help/config.txt	Thu Jan 14 13:34:59 2016 -0800
@@ -2037,3 +2037,27 @@
     Number of CPUs to use for parallel operations. A zero or
     negative value is treated as ``use the default``.
     (default: 4 or the number of CPUs on the system, whichever is larger)
+
+``backgroundclose``
+    Whether to enable closing file handles on background threads during certain
+    operations. Some platforms aren't very efficient at closing file
+    handles that have been written or appened to. By performing file closing
+    on background threads, file write rate can increase substantially.
+    (default: true on Windows, false elsewhere)
+
+``backgroundcloseminfilecount``
+    Minimum number of files required to trigger background file closing.
+    Operations not writing this many files won't start background close
+    threads.
+    (default: 2048)
+
+``backgroundclosemaxqueue``
+    The maximum number of opened file handles waiting to be closed in the
+    background. This option only has an effect if ``backgroundclose`` is
+    enabled.
+    (default: 384)
+
+``backgroundclosethreadcount``
+    Number of threads to process background file closes. Only relevant if
+    ``backgroundclose`` is enabled.
+    (default: 4)
--- a/mercurial/scmutil.py	Tue Jan 12 23:56:48 2016 +0900
+++ b/mercurial/scmutil.py	Thu Jan 14 13:34:59 2016 -0800
@@ -7,6 +7,8 @@
 
 from __future__ import absolute_import
 
+import Queue
+import contextlib
 import errno
 import glob
 import os
@@ -14,6 +16,7 @@
 import shutil
 import stat
 import tempfile
+import threading
 
 from .i18n import _
 from .node import wdirrev
@@ -254,7 +257,7 @@
         return []
 
     def open(self, path, mode="r", text=False, atomictemp=False,
-             notindexed=False):
+             notindexed=False, backgroundclose=False):
         '''Open ``path`` file, which is relative to vfs root.
 
         Newly created directories are marked as "not to be indexed by
@@ -262,7 +265,8 @@
         for "write" mode access.
         '''
         self.open = self.__call__
-        return self.__call__(path, mode, text, atomictemp, notindexed)
+        return self.__call__(path, mode, text, atomictemp, notindexed,
+                             backgroundclose=backgroundclose)
 
     def read(self, path):
         with self(path, 'rb') as fp:
@@ -436,6 +440,27 @@
         for dirpath, dirs, files in os.walk(self.join(path), onerror=onerror):
             yield (dirpath[prefixlen:], dirs, files)
 
+    @contextlib.contextmanager
+    def backgroundclosing(self, ui, expectedcount=-1):
+        """Allow files to be closed asynchronously.
+
+        When this context manager is active, ``backgroundclose`` can be passed
+        to ``__call__``/``open`` to result in the file possibly being closed
+        asynchronously, on a background thread.
+        """
+        # This is an arbitrary restriction and could be changed if we ever
+        # have a use case.
+        vfs = getattr(self, 'vfs', self)
+        if getattr(vfs, '_backgroundfilecloser', None):
+            raise error.Abort('can only have 1 active background file closer')
+
+        with backgroundfilecloser(ui, expectedcount=expectedcount) as bfc:
+            try:
+                vfs._backgroundfilecloser = bfc
+                yield bfc
+            finally:
+                vfs._backgroundfilecloser = None
+
 class vfs(abstractvfs):
     '''Operate files relative to a base directory
 
@@ -478,12 +503,25 @@
         os.chmod(name, self.createmode & 0o666)
 
     def __call__(self, path, mode="r", text=False, atomictemp=False,
-                 notindexed=False):
+                 notindexed=False, backgroundclose=False):
         '''Open ``path`` file, which is relative to vfs root.
 
         Newly created directories are marked as "not to be indexed by
         the content indexing service", if ``notindexed`` is specified
         for "write" mode access.
+
+        If ``backgroundclose`` is passed, the file may be closed asynchronously.
+        It can only be used if the ``self.backgroundclosing()`` context manager
+        is active. This should only be specified if the following criteria hold:
+
+        1. There is a potential for writing thousands of files. Unless you
+           are writing thousands of files, the performance benefits of
+           asynchronously closing files is not realized.
+        2. Files are opened exactly once for the ``backgroundclosing``
+           active duration and are therefore free of race conditions between
+           closing a file on a background thread and reopening it. (If the
+           file were opened multiple times, there could be unflushed data
+           because the original file handle hasn't been flushed/closed yet.)
         '''
         if self._audit:
             r = util.checkosfilename(path)
@@ -528,6 +566,14 @@
         fp = util.posixfile(f, mode)
         if nlink == 0:
             self._fixfilemode(f)
+
+        if backgroundclose:
+            if not self._backgroundfilecloser:
+                raise error.Abort('backgroundclose can only be used when a '
+                                  'backgroundclosing context manager is active')
+
+            fp = delayclosedfile(fp, self._backgroundfilecloser)
+
         return fp
 
     def symlink(self, src, dst):
@@ -1211,3 +1257,123 @@
     """
     # experimental config: format.generaldelta
     return ui.configbool('format', 'generaldelta', False)
+
+class delayclosedfile(object):
+    """Proxy for a file object whose close is delayed.
+
+    Do not instantiate outside of the vfs layer.
+    """
+
+    def __init__(self, fh, closer):
+        object.__setattr__(self, '_origfh', fh)
+        object.__setattr__(self, '_closer', closer)
+
+    def __getattr__(self, attr):
+        return getattr(self._origfh, attr)
+
+    def __setattr__(self, attr, value):
+        return setattr(self._origfh, attr, value)
+
+    def __delattr__(self, attr):
+        return delattr(self._origfh, attr)
+
+    def __enter__(self):
+        return self._origfh.__enter__()
+
+    def __exit__(self, exc_type, exc_value, exc_tb):
+        self._closer.close(self._origfh)
+
+    def close(self):
+        self._closer.close(self._origfh)
+
+class backgroundfilecloser(object):
+    """Coordinates background closing of file handles on multiple threads."""
+    def __init__(self, ui, expectedcount=-1):
+        self._running = False
+        self._entered = False
+        self._threads = []
+        self._threadexception = None
+
+        # Only Windows/NTFS has slow file closing. So only enable by default
+        # on that platform. But allow to be enabled elsewhere for testing.
+        defaultenabled = os.name == 'nt'
+        enabled = ui.configbool('worker', 'backgroundclose', defaultenabled)
+
+        if not enabled:
+            return
+
+        # There is overhead to starting and stopping the background threads.
+        # Don't do background processing unless the file count is large enough
+        # to justify it.
+        minfilecount = ui.configint('worker', 'backgroundcloseminfilecount',
+                                    2048)
+        # FUTURE dynamically start background threads after minfilecount closes.
+        # (We don't currently have any callers that don't know their file count)
+        if expectedcount > 0 and expectedcount < minfilecount:
+            return
+
+        # Windows defaults to a limit of 512 open files. A buffer of 128
+        # should give us enough headway.
+        maxqueue = ui.configint('worker', 'backgroundclosemaxqueue', 384)
+        threadcount = ui.configint('worker', 'backgroundclosethreadcount', 4)
+
+        ui.debug('starting %d threads for background file closing\n' %
+                 threadcount)
+
+        self._queue = Queue.Queue(maxsize=maxqueue)
+        self._running = True
+
+        for i in range(threadcount):
+            t = threading.Thread(target=self._worker, name='backgroundcloser')
+            self._threads.append(t)
+            t.start()
+
+    def __enter__(self):
+        self._entered = True
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_tb):
+        self._running = False
+
+        # Wait for threads to finish closing so open files don't linger for
+        # longer than lifetime of context manager.
+        for t in self._threads:
+            t.join()
+
+    def _worker(self):
+        """Main routine for worker thread."""
+        while True:
+            try:
+                fh = self._queue.get(block=True, timeout=0.100)
+                # Need to catch or the thread will terminate and
+                # we could orphan file descriptors.
+                try:
+                    fh.close()
+                except Exception as e:
+                    # Stash so can re-raise from main thread later.
+                    self._threadexception = e
+            except Queue.Empty:
+                if not self._running:
+                    break
+
+    def close(self, fh):
+        """Schedule a file for closing."""
+        if not self._entered:
+            raise error.Abort('can only call close() when context manager '
+                              'active')
+
+        # If a background thread encountered an exception, raise now so we fail
+        # fast. Otherwise we may potentially go on for minutes until the error
+        # is acted on.
+        if self._threadexception:
+            e = self._threadexception
+            self._threadexception = None
+            raise e
+
+        # If we're not actively running, close synchronously.
+        if not self._running:
+            fh.close()
+            return
+
+        self._queue.put(fh, block=True, timeout=None)
+