changeset 26443:d947086d8973

streamclone: move code out of exchange.py We bulk move functions from exchange.py related to streaming clones. Function names were renamed slightly to drop a component redundant with the module name. Docstrings and comments referencing old names and locations were updated accordingly.
author Gregory Szorc <gregory.szorc@gmail.com>
date Fri, 02 Oct 2015 16:05:52 -0700
parents ef8d27f53204
children 623743010133
files mercurial/exchange.py mercurial/streamclone.py mercurial/wireproto.py
diffstat 3 files changed, 136 insertions(+), 134 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/exchange.py	Fri Oct 02 15:58:24 2015 -0700
+++ b/mercurial/exchange.py	Fri Oct 02 16:05:52 2015 -0700
@@ -5,11 +5,10 @@
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
-import time
 from i18n import _
 from node import hex, nullid
 import errno, urllib
-import util, scmutil, changegroup, base85, error, store
+import util, scmutil, changegroup, base85, error
 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
 import lock as lockmod
 import tags
@@ -1468,131 +1467,3 @@
         if recordout is not None:
             recordout(repo.ui.popbuffer())
     return r
-
-# This is it's own function so extensions can override it.
-def _walkstreamfiles(repo):
-    return repo.store.walk()
-
-def generatestreamclone(repo):
-    """Emit content for a streaming clone.
-
-    This is a generator of raw chunks that constitute a streaming clone.
-
-    The stream begins with a line of 2 space-delimited integers containing the
-    number of entries and total bytes size.
-
-    Next, are N entries for each file being transferred. Each file entry starts
-    as a line with the file name and integer size delimited by a null byte.
-    The raw file data follows. Following the raw file data is the next file
-    entry, or EOF.
-
-    When used on the wire protocol, an additional line indicating protocol
-    success will be prepended to the stream. This function is not responsible
-    for adding it.
-
-    This function will obtain a repository lock to ensure a consistent view of
-    the store is captured. It therefore may raise LockError.
-    """
-    entries = []
-    total_bytes = 0
-    # Get consistent snapshot of repo, lock during scan.
-    lock = repo.lock()
-    try:
-        repo.ui.debug('scanning\n')
-        for name, ename, size in _walkstreamfiles(repo):
-            if size:
-                entries.append((name, size))
-                total_bytes += size
-    finally:
-            lock.release()
-
-    repo.ui.debug('%d files, %d bytes to transfer\n' %
-                  (len(entries), total_bytes))
-    yield '%d %d\n' % (len(entries), total_bytes)
-
-    svfs = repo.svfs
-    oldaudit = svfs.mustaudit
-    debugflag = repo.ui.debugflag
-    svfs.mustaudit = False
-
-    try:
-        for name, size in entries:
-            if debugflag:
-                repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
-            # partially encode name over the wire for backwards compat
-            yield '%s\0%d\n' % (store.encodedir(name), size)
-            if size <= 65536:
-                fp = svfs(name)
-                try:
-                    data = fp.read(size)
-                finally:
-                    fp.close()
-                yield data
-            else:
-                for chunk in util.filechunkiter(svfs(name), limit=size):
-                    yield chunk
-    finally:
-        svfs.mustaudit = oldaudit
-
-def consumestreamclone(repo, fp):
-    """Apply the contents from a streaming clone file.
-
-    This takes the output from "streamout" and applies it to the specified
-    repository.
-
-    Like "streamout," the status line added by the wire protocol is not handled
-    by this function.
-    """
-    lock = repo.lock()
-    try:
-        repo.ui.status(_('streaming all changes\n'))
-        l = fp.readline()
-        try:
-            total_files, total_bytes = map(int, l.split(' ', 1))
-        except (ValueError, TypeError):
-            raise error.ResponseError(
-                _('unexpected response from remote server:'), l)
-        repo.ui.status(_('%d files to transfer, %s of data\n') %
-                       (total_files, util.bytecount(total_bytes)))
-        handled_bytes = 0
-        repo.ui.progress(_('clone'), 0, total=total_bytes)
-        start = time.time()
-
-        tr = repo.transaction(_('clone'))
-        try:
-            for i in xrange(total_files):
-                # XXX doesn't support '\n' or '\r' in filenames
-                l = fp.readline()
-                try:
-                    name, size = l.split('\0', 1)
-                    size = int(size)
-                except (ValueError, TypeError):
-                    raise error.ResponseError(
-                        _('unexpected response from remote server:'), l)
-                if repo.ui.debugflag:
-                    repo.ui.debug('adding %s (%s)\n' %
-                                  (name, util.bytecount(size)))
-                # for backwards compat, name was partially encoded
-                ofp = repo.svfs(store.decodedir(name), 'w')
-                for chunk in util.filechunkiter(fp, limit=size):
-                    handled_bytes += len(chunk)
-                    repo.ui.progress(_('clone'), handled_bytes,
-                                     total=total_bytes)
-                    ofp.write(chunk)
-                ofp.close()
-            tr.close()
-        finally:
-            tr.release()
-
-        # Writing straight to files circumvented the inmemory caches
-        repo.invalidate()
-
-        elapsed = time.time() - start
-        if elapsed <= 0:
-            elapsed = 0.001
-        repo.ui.progress(_('clone'), None)
-        repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
-                       (util.bytecount(total_bytes), elapsed,
-                        util.bytecount(total_bytes / elapsed)))
-    finally:
-        lock.release()
--- a/mercurial/streamclone.py	Fri Oct 02 15:58:24 2015 -0700
+++ b/mercurial/streamclone.py	Fri Oct 02 16:05:52 2015 -0700
@@ -7,14 +7,144 @@
 
 from __future__ import absolute_import
 
+import time
+
 from .i18n import _
 from . import (
     branchmap,
     error,
-    exchange,
+    store,
     util,
 )
 
+# This is it's own function so extensions can override it.
+def _walkstreamfiles(repo):
+    return repo.store.walk()
+
+def generatev1(repo):
+    """Emit content for version 1 of a streaming clone.
+
+    This is a generator of raw chunks that constitute a streaming clone.
+
+    The stream begins with a line of 2 space-delimited integers containing the
+    number of entries and total bytes size.
+
+    Next, are N entries for each file being transferred. Each file entry starts
+    as a line with the file name and integer size delimited by a null byte.
+    The raw file data follows. Following the raw file data is the next file
+    entry, or EOF.
+
+    When used on the wire protocol, an additional line indicating protocol
+    success will be prepended to the stream. This function is not responsible
+    for adding it.
+
+    This function will obtain a repository lock to ensure a consistent view of
+    the store is captured. It therefore may raise LockError.
+    """
+    entries = []
+    total_bytes = 0
+    # Get consistent snapshot of repo, lock during scan.
+    lock = repo.lock()
+    try:
+        repo.ui.debug('scanning\n')
+        for name, ename, size in _walkstreamfiles(repo):
+            if size:
+                entries.append((name, size))
+                total_bytes += size
+    finally:
+            lock.release()
+
+    repo.ui.debug('%d files, %d bytes to transfer\n' %
+                  (len(entries), total_bytes))
+    yield '%d %d\n' % (len(entries), total_bytes)
+
+    svfs = repo.svfs
+    oldaudit = svfs.mustaudit
+    debugflag = repo.ui.debugflag
+    svfs.mustaudit = False
+
+    try:
+        for name, size in entries:
+            if debugflag:
+                repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
+            # partially encode name over the wire for backwards compat
+            yield '%s\0%d\n' % (store.encodedir(name), size)
+            if size <= 65536:
+                fp = svfs(name)
+                try:
+                    data = fp.read(size)
+                finally:
+                    fp.close()
+                yield data
+            else:
+                for chunk in util.filechunkiter(svfs(name), limit=size):
+                    yield chunk
+    finally:
+        svfs.mustaudit = oldaudit
+
+def consumev1(repo, fp):
+    """Apply the contents from version 1 of a streaming clone file handle.
+
+    This takes the output from "streamout" and applies it to the specified
+    repository.
+
+    Like "streamout," the status line added by the wire protocol is not handled
+    by this function.
+    """
+    lock = repo.lock()
+    try:
+        repo.ui.status(_('streaming all changes\n'))
+        l = fp.readline()
+        try:
+            total_files, total_bytes = map(int, l.split(' ', 1))
+        except (ValueError, TypeError):
+            raise error.ResponseError(
+                _('unexpected response from remote server:'), l)
+        repo.ui.status(_('%d files to transfer, %s of data\n') %
+                       (total_files, util.bytecount(total_bytes)))
+        handled_bytes = 0
+        repo.ui.progress(_('clone'), 0, total=total_bytes)
+        start = time.time()
+
+        tr = repo.transaction(_('clone'))
+        try:
+            for i in xrange(total_files):
+                # XXX doesn't support '\n' or '\r' in filenames
+                l = fp.readline()
+                try:
+                    name, size = l.split('\0', 1)
+                    size = int(size)
+                except (ValueError, TypeError):
+                    raise error.ResponseError(
+                        _('unexpected response from remote server:'), l)
+                if repo.ui.debugflag:
+                    repo.ui.debug('adding %s (%s)\n' %
+                                  (name, util.bytecount(size)))
+                # for backwards compat, name was partially encoded
+                ofp = repo.svfs(store.decodedir(name), 'w')
+                for chunk in util.filechunkiter(fp, limit=size):
+                    handled_bytes += len(chunk)
+                    repo.ui.progress(_('clone'), handled_bytes,
+                                     total=total_bytes)
+                    ofp.write(chunk)
+                ofp.close()
+            tr.close()
+        finally:
+            tr.release()
+
+        # Writing straight to files circumvented the inmemory caches
+        repo.invalidate()
+
+        elapsed = time.time() - start
+        if elapsed <= 0:
+            elapsed = 0.001
+        repo.ui.progress(_('clone'), None)
+        repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
+                       (util.bytecount(total_bytes), elapsed,
+                        util.bytecount(total_bytes / elapsed)))
+    finally:
+        lock.release()
+
 def streamin(repo, remote, remotereqs):
     # Save remote branchmap. We will use it later
     # to speed up branchcache creation
@@ -46,11 +176,11 @@
     "remotebranchmap" is the result of a branchmap lookup on the remote. It
     can be None.
     "fp" is a file object containing the raw stream data, suitable for
-    feeding into exchange.consumestreamclone.
+    feeding into consumev1().
     """
     lock = repo.lock()
     try:
-        exchange.consumestreamclone(repo, fp)
+        consumev1(repo, fp)
 
         # new requirements = old non-format requirements +
         #                    new format-related remote requirements
--- a/mercurial/wireproto.py	Fri Oct 02 15:58:24 2015 -0700
+++ b/mercurial/wireproto.py	Fri Oct 02 16:05:52 2015 -0700
@@ -26,6 +26,7 @@
     exchange,
     peer,
     pushkey as pushkeymod,
+    streamclone,
     util,
 )
 
@@ -720,7 +721,7 @@
     try:
         # LockError may be raised before the first result is yielded. Don't
         # emit output until we're sure we got the lock successfully.
-        it = exchange.generatestreamclone(repo)
+        it = streamclone.generatev1(repo)
         return streamres(getstream(it))
     except error.LockError:
         return '2\n'