diff mercurial/wireproto.py @ 25235:41965bf23295

exchange: move code for generating a streaming clone into exchange Streaming clones are fast because they are essentially tar files. On mozilla-central, a streaming clone only consumes ~55s CPU time on clients as opposed to ~340s CPU time for a regular clone or gzip bundle unbundle. Mozilla is deploying static file "lookaside" support to our Mercurial server. Static bundles are pre-generated and uploaded to S3. When a clone is performed, the static file is fetched, applied, and then an incremental pull is performed. Unfortunately, on an ideal network connection this still takes as much wall and CPU time as a regular clone (although it does save significant server resources). We like the client-side wall time wins of streaming clones. But we want to leverage S3-based pre-generated files for serving the bulk of clone data. This patch moves the code for producing a "stream bundle" into its own standalone function, away from the wire protocol. This will enable stream bundle files to be produced outside the context of the wire protocol. A bikeshed on whether exchange is the best module for this function might be warranted. I selected exchange instead of changegroup because "stream bundles" aren't changegroups (yet).
author Gregory Szorc <gregory.szorc@gmail.com>
date Thu, 21 May 2015 10:27:22 -0700
parents 6dbbb4fa1892
children a415e94fd34f
line wrap: on
line diff
--- a/mercurial/wireproto.py	Tue May 19 10:13:43 2015 -0700
+++ b/mercurial/wireproto.py	Thu May 21 10:27:22 2015 -0700
@@ -744,73 +744,27 @@
 def _allowstream(ui):
     return ui.configbool('server', 'uncompressed', True, untrusted=True)
 
-def _walkstreamfiles(repo):
-    # this is it's own function so extensions can override it
-    return repo.store.walk()
-
 @wireprotocommand('stream_out')
 def stream(repo, proto):
     '''If the server supports streaming clone, it advertises the "stream"
     capability with a value representing the version and flags of the repo
     it is serving. Client checks to see if it understands the format.
-
-    The format is simple: the server writes out a line with the amount
-    of files, then the total amount of bytes to be transferred (separated
-    by a space). Then, for each file, the server first writes the filename
-    and file size (separated by the null character), then the file contents.
     '''
-
     if not _allowstream(repo.ui):
         return '1\n'
 
-    entries = []
-    total_bytes = 0
-    try:
-        # get consistent snapshot of repo, lock during scan
-        lock = repo.lock()
-        try:
-            repo.ui.debug('scanning\n')
-            for name, ename, size in _walkstreamfiles(repo):
-                if size:
-                    entries.append((name, size))
-                    total_bytes += size
-        finally:
-            lock.release()
-    except error.LockError:
-        return '2\n' # error: 2
-
-    def streamer(repo, entries, total):
-        '''stream out all metadata files in repository.'''
-        yield '0\n' # success
-        repo.ui.debug('%d files, %d bytes to transfer\n' %
-                      (len(entries), total_bytes))
-        yield '%d %d\n' % (len(entries), total_bytes)
+    def getstream(it):
+        yield '0\n'
+        for chunk in it:
+            yield chunk
 
-        sopener = repo.svfs
-        oldaudit = sopener.mustaudit
-        debugflag = repo.ui.debugflag
-        sopener.mustaudit = False
-
-        try:
-            for name, size in entries:
-                if debugflag:
-                    repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
-                # partially encode name over the wire for backwards compat
-                yield '%s\0%d\n' % (store.encodedir(name), size)
-                if size <= 65536:
-                    fp = sopener(name)
-                    try:
-                        data = fp.read(size)
-                    finally:
-                        fp.close()
-                    yield data
-                else:
-                    for chunk in util.filechunkiter(sopener(name), limit=size):
-                        yield chunk
-        finally:
-            sopener.mustaudit = oldaudit
-
-    return streamres(streamer(repo, entries, total_bytes))
+    try:
+        # LockError may be raised before the first result is yielded. Don't
+        # emit output until we're sure we got the lock successfully.
+        it = exchange.generatestreamclone(repo)
+        return streamres(getstream(it))
+    except error.LockError:
+        return '2\n'
 
 @wireprotocommand('unbundle', 'heads')
 def unbundle(repo, proto, heads):