streamclone: move code out of exchange.py
We bulk move functions from exchange.py related to streaming clones.
Function names were renamed slightly to drop a component redundant with
the module name. Docstrings and comments referencing old names and
locations were updated accordingly.
--- a/mercurial/exchange.py Fri Oct 02 15:58:24 2015 -0700
+++ b/mercurial/exchange.py Fri Oct 02 16:05:52 2015 -0700
@@ -5,11 +5,10 @@
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
-import time
from i18n import _
from node import hex, nullid
import errno, urllib
-import util, scmutil, changegroup, base85, error, store
+import util, scmutil, changegroup, base85, error
import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
import lock as lockmod
import tags
@@ -1468,131 +1467,3 @@
if recordout is not None:
recordout(repo.ui.popbuffer())
return r
-
-# This is it's own function so extensions can override it.
-def _walkstreamfiles(repo):
- return repo.store.walk()
-
-def generatestreamclone(repo):
- """Emit content for a streaming clone.
-
- This is a generator of raw chunks that constitute a streaming clone.
-
- The stream begins with a line of 2 space-delimited integers containing the
- number of entries and total bytes size.
-
- Next, are N entries for each file being transferred. Each file entry starts
- as a line with the file name and integer size delimited by a null byte.
- The raw file data follows. Following the raw file data is the next file
- entry, or EOF.
-
- When used on the wire protocol, an additional line indicating protocol
- success will be prepended to the stream. This function is not responsible
- for adding it.
-
- This function will obtain a repository lock to ensure a consistent view of
- the store is captured. It therefore may raise LockError.
- """
- entries = []
- total_bytes = 0
- # Get consistent snapshot of repo, lock during scan.
- lock = repo.lock()
- try:
- repo.ui.debug('scanning\n')
- for name, ename, size in _walkstreamfiles(repo):
- if size:
- entries.append((name, size))
- total_bytes += size
- finally:
- lock.release()
-
- repo.ui.debug('%d files, %d bytes to transfer\n' %
- (len(entries), total_bytes))
- yield '%d %d\n' % (len(entries), total_bytes)
-
- svfs = repo.svfs
- oldaudit = svfs.mustaudit
- debugflag = repo.ui.debugflag
- svfs.mustaudit = False
-
- try:
- for name, size in entries:
- if debugflag:
- repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
- # partially encode name over the wire for backwards compat
- yield '%s\0%d\n' % (store.encodedir(name), size)
- if size <= 65536:
- fp = svfs(name)
- try:
- data = fp.read(size)
- finally:
- fp.close()
- yield data
- else:
- for chunk in util.filechunkiter(svfs(name), limit=size):
- yield chunk
- finally:
- svfs.mustaudit = oldaudit
-
-def consumestreamclone(repo, fp):
- """Apply the contents from a streaming clone file.
-
- This takes the output from "streamout" and applies it to the specified
- repository.
-
- Like "streamout," the status line added by the wire protocol is not handled
- by this function.
- """
- lock = repo.lock()
- try:
- repo.ui.status(_('streaming all changes\n'))
- l = fp.readline()
- try:
- total_files, total_bytes = map(int, l.split(' ', 1))
- except (ValueError, TypeError):
- raise error.ResponseError(
- _('unexpected response from remote server:'), l)
- repo.ui.status(_('%d files to transfer, %s of data\n') %
- (total_files, util.bytecount(total_bytes)))
- handled_bytes = 0
- repo.ui.progress(_('clone'), 0, total=total_bytes)
- start = time.time()
-
- tr = repo.transaction(_('clone'))
- try:
- for i in xrange(total_files):
- # XXX doesn't support '\n' or '\r' in filenames
- l = fp.readline()
- try:
- name, size = l.split('\0', 1)
- size = int(size)
- except (ValueError, TypeError):
- raise error.ResponseError(
- _('unexpected response from remote server:'), l)
- if repo.ui.debugflag:
- repo.ui.debug('adding %s (%s)\n' %
- (name, util.bytecount(size)))
- # for backwards compat, name was partially encoded
- ofp = repo.svfs(store.decodedir(name), 'w')
- for chunk in util.filechunkiter(fp, limit=size):
- handled_bytes += len(chunk)
- repo.ui.progress(_('clone'), handled_bytes,
- total=total_bytes)
- ofp.write(chunk)
- ofp.close()
- tr.close()
- finally:
- tr.release()
-
- # Writing straight to files circumvented the inmemory caches
- repo.invalidate()
-
- elapsed = time.time() - start
- if elapsed <= 0:
- elapsed = 0.001
- repo.ui.progress(_('clone'), None)
- repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
- (util.bytecount(total_bytes), elapsed,
- util.bytecount(total_bytes / elapsed)))
- finally:
- lock.release()
--- a/mercurial/streamclone.py Fri Oct 02 15:58:24 2015 -0700
+++ b/mercurial/streamclone.py Fri Oct 02 16:05:52 2015 -0700
@@ -7,14 +7,144 @@
from __future__ import absolute_import
+import time
+
from .i18n import _
from . import (
branchmap,
error,
- exchange,
+ store,
util,
)
+# This is it's own function so extensions can override it.
+def _walkstreamfiles(repo):
+ return repo.store.walk()
+
+def generatev1(repo):
+ """Emit content for version 1 of a streaming clone.
+
+ This is a generator of raw chunks that constitute a streaming clone.
+
+ The stream begins with a line of 2 space-delimited integers containing the
+ number of entries and total bytes size.
+
+ Next, are N entries for each file being transferred. Each file entry starts
+ as a line with the file name and integer size delimited by a null byte.
+ The raw file data follows. Following the raw file data is the next file
+ entry, or EOF.
+
+ When used on the wire protocol, an additional line indicating protocol
+ success will be prepended to the stream. This function is not responsible
+ for adding it.
+
+ This function will obtain a repository lock to ensure a consistent view of
+ the store is captured. It therefore may raise LockError.
+ """
+ entries = []
+ total_bytes = 0
+ # Get consistent snapshot of repo, lock during scan.
+ lock = repo.lock()
+ try:
+ repo.ui.debug('scanning\n')
+ for name, ename, size in _walkstreamfiles(repo):
+ if size:
+ entries.append((name, size))
+ total_bytes += size
+ finally:
+ lock.release()
+
+ repo.ui.debug('%d files, %d bytes to transfer\n' %
+ (len(entries), total_bytes))
+ yield '%d %d\n' % (len(entries), total_bytes)
+
+ svfs = repo.svfs
+ oldaudit = svfs.mustaudit
+ debugflag = repo.ui.debugflag
+ svfs.mustaudit = False
+
+ try:
+ for name, size in entries:
+ if debugflag:
+ repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
+ # partially encode name over the wire for backwards compat
+ yield '%s\0%d\n' % (store.encodedir(name), size)
+ if size <= 65536:
+ fp = svfs(name)
+ try:
+ data = fp.read(size)
+ finally:
+ fp.close()
+ yield data
+ else:
+ for chunk in util.filechunkiter(svfs(name), limit=size):
+ yield chunk
+ finally:
+ svfs.mustaudit = oldaudit
+
+def consumev1(repo, fp):
+ """Apply the contents from version 1 of a streaming clone file handle.
+
+ This takes the output from "streamout" and applies it to the specified
+ repository.
+
+ Like "streamout," the status line added by the wire protocol is not handled
+ by this function.
+ """
+ lock = repo.lock()
+ try:
+ repo.ui.status(_('streaming all changes\n'))
+ l = fp.readline()
+ try:
+ total_files, total_bytes = map(int, l.split(' ', 1))
+ except (ValueError, TypeError):
+ raise error.ResponseError(
+ _('unexpected response from remote server:'), l)
+ repo.ui.status(_('%d files to transfer, %s of data\n') %
+ (total_files, util.bytecount(total_bytes)))
+ handled_bytes = 0
+ repo.ui.progress(_('clone'), 0, total=total_bytes)
+ start = time.time()
+
+ tr = repo.transaction(_('clone'))
+ try:
+ for i in xrange(total_files):
+ # XXX doesn't support '\n' or '\r' in filenames
+ l = fp.readline()
+ try:
+ name, size = l.split('\0', 1)
+ size = int(size)
+ except (ValueError, TypeError):
+ raise error.ResponseError(
+ _('unexpected response from remote server:'), l)
+ if repo.ui.debugflag:
+ repo.ui.debug('adding %s (%s)\n' %
+ (name, util.bytecount(size)))
+ # for backwards compat, name was partially encoded
+ ofp = repo.svfs(store.decodedir(name), 'w')
+ for chunk in util.filechunkiter(fp, limit=size):
+ handled_bytes += len(chunk)
+ repo.ui.progress(_('clone'), handled_bytes,
+ total=total_bytes)
+ ofp.write(chunk)
+ ofp.close()
+ tr.close()
+ finally:
+ tr.release()
+
+ # Writing straight to files circumvented the inmemory caches
+ repo.invalidate()
+
+ elapsed = time.time() - start
+ if elapsed <= 0:
+ elapsed = 0.001
+ repo.ui.progress(_('clone'), None)
+ repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
+ (util.bytecount(total_bytes), elapsed,
+ util.bytecount(total_bytes / elapsed)))
+ finally:
+ lock.release()
+
def streamin(repo, remote, remotereqs):
# Save remote branchmap. We will use it later
# to speed up branchcache creation
@@ -46,11 +176,11 @@
"remotebranchmap" is the result of a branchmap lookup on the remote. It
can be None.
"fp" is a file object containing the raw stream data, suitable for
- feeding into exchange.consumestreamclone.
+ feeding into consumev1().
"""
lock = repo.lock()
try:
- exchange.consumestreamclone(repo, fp)
+ consumev1(repo, fp)
# new requirements = old non-format requirements +
# new format-related remote requirements
--- a/mercurial/wireproto.py Fri Oct 02 15:58:24 2015 -0700
+++ b/mercurial/wireproto.py Fri Oct 02 16:05:52 2015 -0700
@@ -26,6 +26,7 @@
exchange,
peer,
pushkey as pushkeymod,
+ streamclone,
util,
)
@@ -720,7 +721,7 @@
try:
# LockError may be raised before the first result is yielded. Don't
# emit output until we're sure we got the lock successfully.
- it = exchange.generatestreamclone(repo)
+ it = streamclone.generatev1(repo)
return streamres(getstream(it))
except error.LockError:
return '2\n'