changeset 4946:e8f4e40f285a

convert/subversion: work around memory leak in svn's python bindings The svn.ra.get_log wrapper attaches the hash of changed paths for every log entry to a global memory pool, so memory consumption increases rapidly, with no way to free it. Our workaround is to call this function in a child process, and feed its results back over a pipe. The memory consumption of the child still grows huge (hundreds of megabytes), but at least it goes away once the reading-the-log phase is done.
author Bryan O'Sullivan <bos@serpentine.com>
date Thu, 19 Jul 2007 12:41:07 -0700
parents 382520bacc17
children 3e25a6eb5c9a
files hgext/convert/subversion.py
diffstat 1 files changed, 81 insertions(+), 37 deletions(-) [+]
line wrap: on
line diff
--- a/hgext/convert/subversion.py	Wed Jul 18 22:46:14 2007 -0700
+++ b/hgext/convert/subversion.py	Thu Jul 19 12:41:07 2007 -0700
@@ -15,7 +15,8 @@
 
 import pprint
 import locale
-
+import os
+import cPickle as pickle
 from mercurial import util
 
 # Subversion stuff. Works best with very recent Python SVN bindings
@@ -38,6 +39,12 @@
 
 class CompatibilityException(Exception): pass
 
+class changedpath(object):
+    def __init__(self, p):
+        self.copyfrom_path = p.copyfrom_path
+        self.copyfrom_rev = p.copyfrom_rev
+        self.action = p.action
+
 # SVN conversion code stolen from bzr-svn and tailor
 class convert_svn(converter_source):
     def __init__(self, ui, url, rev=None):
@@ -71,9 +78,9 @@
         self.url = url
         self.encoding = 'UTF-8' # Subversion is always nominal UTF-8
         try:
-            self.transport = transport.SvnRaTransport(url = url)
+            self.transport = transport.SvnRaTransport(url=url)
             self.ra = self.transport.ra
-            self.ctx = svn.client.create_context()
+            self.ctx = self.transport.client
             self.base = svn.ra.get_repos_root(self.ra)
             self.module = self.url[len(self.base):]
             self.modulemap = {} # revision, module
@@ -174,10 +181,65 @@
         del self.commits[rev]
         return commit
 
+    def get_log(self, paths, start, end, limit=0, discover_changed_paths=True,
+                strict_node_history=False):
+        '''wrapper for svn.ra.get_log.
+        on a large repository, svn.ra.get_log pins huge amounts of
+        memory that cannot be recovered.  work around it by forking
+        and writing results over a pipe.'''
+
+        def child(fp):
+            protocol = -1
+            def receiver(orig_paths, revnum, author, date, message, pool):
+                if orig_paths is not None:
+                    for k, v in orig_paths.iteritems():
+                        orig_paths[k] = changedpath(v)
+                pickle.dump((orig_paths, revnum, author, date, message),
+                            fp, protocol)
+
+            try:
+                # Use an ra of our own so that our parent can consume
+                # our results without confusing the server.
+                t = transport.SvnRaTransport(url=self.url)
+                svn.ra.get_log(t.ra, paths, start, end, limit,
+                               discover_changed_paths,
+                               strict_node_history,
+                               receiver)
+            except SubversionException, (_, num):
+                pickle.dump(num, fp, protocol)
+            else:
+                pickle.dump(None, fp, protocol)
+            fp.close()
+
+        def parent(fp):
+            while True:
+                entry = pickle.load(fp)
+                try:
+                    orig_paths, revnum, author, date, message = entry
+                except:
+                    if entry is None:
+                        break
+                    raise SubversionException("child raised exception", entry)
+                yield entry
+
+        rfd, wfd = os.pipe()
+        pid = os.fork()
+        if pid:
+            os.close(wfd)
+            for p in parent(os.fdopen(rfd, 'rb')):
+                yield p
+            ret = os.waitpid(pid, 0)[1]
+            if ret:
+                raise util.Abort(_('get_log %s') % util.explain_exit(ret))
+        else:
+            os.close(rfd)
+            child(os.fdopen(wfd, 'wb'))
+            os._exit(0)
+
     def gettags(self):
         tags = {}
-        def parselogentry(*arg, **args):
-            orig_paths, revnum, author, date, message, pool = arg
+        for entry in self.get_log(['/tags'], 0, self.revnum(self.head)):
+            orig_paths, revnum, author, date, message = entry
             for path in orig_paths:
                 if not path.startswith('/tags/'):
                     continue
@@ -186,15 +248,7 @@
                 rev = ent.copyfrom_rev
                 tag = path.split('/', 2)[2]
                 tags[tag] = self.revid(rev, module=source)
-
-        start = self.revnum(self.head)
-        try:
-            svn.ra.get_log(self.ra, ['/tags'], 0, start, 0, True, False,
-                           parselogentry)
-            return tags
-        except SubversionException:
-            self.ui.note('no tags found at revision %d\n' % start)
-            return {}
+        return tags
 
     # -- helper functions --
 
@@ -276,23 +330,10 @@
             self.ui.debug('Ignoring %r since it is not under %r\n' % (path, module))
             return None
 
-        received = []
-        # svn.ra.get_log requires no other calls to the ra until it completes,
-        # so we just collect the log entries and parse them afterwards
-        def receivelog(orig_paths, revnum, author, date, message, pool):
-            if self.is_blacklisted(revnum):
-                self.ui.note('skipping blacklisted revision %d\n' % revnum)
-                return
-           
-            if orig_paths is None:
-                self.ui.debug('revision %d has no entries\n' % revnum)
-                return
-
-            received.append((revnum, orig_paths.items(), author, date, message))
-
         self.child_cset = None
-        def parselogentry((revnum, orig_paths, author, date, message)):
-            self.ui.debug("parsing revision %d\n" % revnum)
+        def parselogentry(orig_paths, revnum, author, date, message):
+            self.ui.debug("parsing revision %d (%d changes)\n" %
+                          (revnum, len(orig_paths)))
 
             if revnum in self.modulemap:
                 new_module = self.modulemap[revnum]
@@ -318,6 +359,7 @@
             except IndexError:
                 branch = None
 
+            orig_paths = orig_paths.items()
             orig_paths.sort()
             for path, ent in orig_paths:
                 # self.ui.write("path %s\n" % path)
@@ -527,13 +569,15 @@
         try:
             discover_changed_paths = True
             strict_node_history = False
-            svn.ra.get_log(self.ra, [self.module], from_revnum, to_revnum, 0,
-                           discover_changed_paths, strict_node_history,
-                           receivelog)
-            self.ui.note('parsing %d log entries for "%s"\n' %
-                         (len(received), self.module))
-            for entry in received:
-                parselogentry(entry)
+            for entry in self.get_log([self.module], from_revnum, to_revnum):
+                orig_paths, revnum, author, date, message = entry
+                if self.is_blacklisted(revnum):
+                    self.ui.note('skipping blacklisted revision %d\n' % revnum)
+                    continue
+                if orig_paths is None:
+                    self.ui.debug('revision %d has no entries\n' % revnum)
+                    continue
+                parselogentry(orig_paths, revnum, author, date, message)
         except SubversionException, (_, num):
             if num == svn.core.SVN_ERR_FS_NO_SUCH_REVISION:
                 raise NoSuchRevision(branch=self,