# HG changeset patch # User Gregory Szorc # Date 1432229265 25200 # Node ID 5095059340dcb25d36758c0396620c5f3b0f2064 # Parent 41965bf232951a4ad0302277a56d44c965802bc8 exchange: move code for consuming streaming clone into exchange For reasons outlined in the previous commit, we want to make the code for consuming "stream bundles" reusable. This patch extracts the code into a standalone function. diff -r 41965bf23295 -r 5095059340dc mercurial/exchange.py --- a/mercurial/exchange.py Thu May 21 10:27:22 2015 -0700 +++ b/mercurial/exchange.py Thu May 21 10:27:45 2015 -0700 @@ -5,6 +5,7 @@ # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. +import time from i18n import _ from node import hex, nullid import errno, urllib @@ -1397,3 +1398,66 @@ yield chunk finally: sopener.mustaudit = oldaudit + +def consumestreamclone(repo, fp): + """Apply the contents from a streaming clone file. + + This takes the output from "streamout" and applies it to the specified + repository. + + Like "streamout," the status line added by the wire protocol is not handled + by this function. + """ + lock = repo.lock() + try: + repo.ui.status(_('streaming all changes\n')) + l = fp.readline() + try: + total_files, total_bytes = map(int, l.split(' ', 1)) + except (ValueError, TypeError): + raise error.ResponseError( + _('unexpected response from remote server:'), l) + repo.ui.status(_('%d files to transfer, %s of data\n') % + (total_files, util.bytecount(total_bytes))) + handled_bytes = 0 + repo.ui.progress(_('clone'), 0, total=total_bytes) + start = time.time() + + tr = repo.transaction(_('clone')) + try: + for i in xrange(total_files): + # XXX doesn't support '\n' or '\r' in filenames + l = fp.readline() + try: + name, size = l.split('\0', 1) + size = int(size) + except (ValueError, TypeError): + raise error.ResponseError( + _('unexpected response from remote server:'), l) + if repo.ui.debugflag: + repo.ui.debug('adding %s (%s)\n' % + (name, util.bytecount(size))) + # for backwards compat, name was partially encoded + ofp = repo.svfs(store.decodedir(name), 'w') + for chunk in util.filechunkiter(fp, limit=size): + handled_bytes += len(chunk) + repo.ui.progress(_('clone'), handled_bytes, + total=total_bytes) + ofp.write(chunk) + ofp.close() + tr.close() + finally: + tr.release() + + # Writing straight to files circumvented the inmemory caches + repo.invalidate() + + elapsed = time.time() - start + if elapsed <= 0: + elapsed = 0.001 + repo.ui.progress(_('clone'), None) + repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % + (util.bytecount(total_bytes), elapsed, + util.bytecount(total_bytes / elapsed))) + finally: + lock.release() diff -r 41965bf23295 -r 5095059340dc mercurial/localrepo.py --- a/mercurial/localrepo.py Thu May 21 10:27:22 2015 -0700 +++ b/mercurial/localrepo.py Thu May 21 10:27:45 2015 -0700 @@ -1776,55 +1776,8 @@ raise util.Abort(_('locking the remote repository failed')) elif resp != 0: raise util.Abort(_('the server sent an unknown error code')) - self.ui.status(_('streaming all changes\n')) - l = fp.readline() - try: - total_files, total_bytes = map(int, l.split(' ', 1)) - except (ValueError, TypeError): - raise error.ResponseError( - _('unexpected response from remote server:'), l) - self.ui.status(_('%d files to transfer, %s of data\n') % - (total_files, util.bytecount(total_bytes))) - handled_bytes = 0 - self.ui.progress(_('clone'), 0, total=total_bytes) - start = time.time() - tr = self.transaction(_('clone')) - try: - for i in xrange(total_files): - # XXX doesn't support '\n' or '\r' in filenames - l = fp.readline() - try: - name, size = l.split('\0', 1) - size = int(size) - except (ValueError, TypeError): - raise error.ResponseError( - _('unexpected response from remote server:'), l) - if self.ui.debugflag: - self.ui.debug('adding %s (%s)\n' % - (name, util.bytecount(size))) - # for backwards compat, name was partially encoded - ofp = self.svfs(store.decodedir(name), 'w') - for chunk in util.filechunkiter(fp, limit=size): - handled_bytes += len(chunk) - self.ui.progress(_('clone'), handled_bytes, - total=total_bytes) - ofp.write(chunk) - ofp.close() - tr.close() - finally: - tr.release() - - # Writing straight to files circumvented the inmemory caches - self.invalidate() - - elapsed = time.time() - start - if elapsed <= 0: - elapsed = 0.001 - self.ui.progress(_('clone'), None) - self.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % - (util.bytecount(total_bytes), elapsed, - util.bytecount(total_bytes / elapsed))) + exchange.consumestreamclone(self, fp) # new requirements = old non-format requirements + # new format-related remote requirements