changeset 13760:ed97955e0c04

convert/mtn: add support for using monotone's "automate stdio" when available Currently the convert extension spawns a new mtn process for each operation. For a large repository, this ends up being hundreds of thousands of processes. The following enables usage of monotone's "automate stdio" functionality - documented at: http://www.monotone.ca/docs/Automation.html#index-mtn-automate-stdio-188 The effect is that (after determining that a new enough mtn executable is available) a single long-running mtn process is used for all the operations, using stdin/stdout to send commands and read output. This has a pretty significant effect on the performance of some parts of the conversion process.
author Daniel Atallah <daniel.atallah@gmail.com>
date Wed, 23 Mar 2011 14:26:56 -0400
parents 49b818fd26d8
children aeb41f0048e7
files hgext/convert/monotone.py
diffstat 1 files changed, 134 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/hgext/convert/monotone.py	Thu Mar 24 16:54:09 2011 -0400
+++ b/hgext/convert/monotone.py	Wed Mar 23 14:26:56 2011 -0400
@@ -19,6 +19,7 @@
 
         self.ui = ui
         self.path = path
+        self.automatestdio = False
 
         norepo = NoRepo(_("%s does not look like a monotone repository")
                         % path)
@@ -73,9 +74,102 @@
         self.rev = rev
 
     def mtnrun(self, *args, **kwargs):
+        if self.automatestdio:
+            return self.mtnrunstdio(*args, **kwargs)
+        else:
+            return self.mtnrunsingle(*args, **kwargs)
+
+    def mtnrunsingle(self, *args, **kwargs):
         kwargs['d'] = self.path
         return self.run0('automate', *args, **kwargs)
 
+    def mtnrunstdio(self, *args, **kwargs):
+        # Prepare the command in automate stdio format
+        command = []
+        for k, v in kwargs.iteritems():
+            command.append("%s:%s" % (len(k), k))
+            if v:
+                command.append("%s:%s" % (len(v), v))
+        if command:
+            command.insert(0, 'o')
+            command.append('e')
+
+        command.append('l')
+        for arg in args:
+            command += "%s:%s" % (len(arg), arg)
+        command.append('e')
+        command = ''.join(command)
+
+        self.ui.debug("mtn: sending '%s'\n" % command)
+        self.mtnwritefp.write(command)
+        self.mtnwritefp.flush()
+
+        return self.mtnstdioreadcommandoutput(command)
+
+    def mtnstdioreadpacket(self):
+        read = None
+        commandnbr = ''
+        while read != ':':
+            read = self.mtnreadfp.read(1)
+            if not read:
+                raise util.Abort(_('bad mtn packet - no end of commandnbr'))
+            commandnbr += read
+        commandnbr = commandnbr[:-1]
+
+        stream = self.mtnreadfp.read(1)
+        if stream not in 'mewptl':
+            raise util.Abort(_('bad mtn packet - bad stream type %s' % stream))
+
+        read = self.mtnreadfp.read(1)
+        if read != ':':
+            raise util.Abort(_('bad mtn packet - no divider before size'))
+
+        read = None
+        lengthstr = ''
+        while read != ':':
+            read = self.mtnreadfp.read(1)
+            if not read:
+                raise util.Abort(_('bad mtn packet - no end of packet size'))
+            lengthstr += read
+        try:
+            length = long(lengthstr[:-1])
+        except TypeError:
+            raise util.Abort(_('bad mtn packet - bad packet size %s')
+                % lengthstr)
+
+        read = self.mtnreadfp.read(length)
+        if len(read) != length:
+            raise util.Abort(_("bad mtn packet - unable to read full packet "
+                "read %s of %s") % (len(read), length))
+
+        return (commandnbr, stream, length, read)
+
+    def mtnstdioreadcommandoutput(self, command):
+        retval = ''
+        while True:
+            commandnbr, stream, length, output = self.mtnstdioreadpacket()
+            self.ui.debug('mtn: read packet %s:%s:%s\n' %
+                (commandnbr, stream, length))
+
+            if stream == 'l':
+                # End of command
+                if output != '0':
+                    raise util.Abort(_("mtn command '%s' returned %s") %
+                        (command, output))
+                break
+            elif stream in 'ew':
+                # Error, warning output
+                self.ui.warn(_('%s error:\n') % self.command)
+                self.ui.warn(output)
+            elif stream == 'p':
+                # Progress messages
+                self.ui.debug('mtn: ' + output)
+            elif stream == 'm':
+                # Main stream - command output
+                retval = output
+
+        return retval
+
     def mtnloadmanifest(self, rev):
         if self.manifest_rev == rev:
             return
@@ -225,3 +319,43 @@
         # This function is only needed to support --filemap
         # ... and we don't support that
         raise NotImplementedError()
+
+    def before(self):
+        # Check if we have a new enough version to use automate stdio
+        version = 0.0
+        try:
+            versionstr = self.mtnrunsingle("interface_version")
+            version = float(versionstr)
+        except Exception:
+            raise util.Abort(_("unable to determine mtn automate interface "
+                "version"))
+
+        if version >= 12.0:
+            self.automatestdio = True
+            self.ui.debug("mtn automate version %s - using automate stdio\n" %
+                version)
+
+            # launch the long-running automate stdio process
+            self.mtnwritefp, self.mtnreadfp = self._run2('automate', 'stdio',
+                '-d', self.path)
+            # read the headers
+            read = self.mtnreadfp.readline()
+            if read != 'format-version: 2\n':
+                raise util.Abort(_('mtn automate stdio header unexpected: %s')
+                    % read)
+            while read != '\n':
+                read = self.mtnreadfp.readline()
+                if not read:
+                    raise util.Abort(_("failed to reach end of mtn automate "
+                        "stdio headers"))
+        else:
+            self.ui.debug("mtn automate version %s - not using automate stdio "
+                "(automate >= 12.0 - mtn >= 0.46 is needed)\n" % version)
+
+    def after(self):
+        if self.automatestdio:
+            self.mtnwritefp.close()
+            self.mtnwritefp = None
+            self.mtnreadfp.close()
+            self.mtnreadfp = None
+