Mercurial > hg
changeset 35433:f98fac24b757
lfs: using workers in lfs prefetch
This significantly speeds up lfs prefetch. With fast network we are
seeing ~50% improvement of overall prefetch times
Because of worker's API in posix we do lose finegrained progress update and only
see progress when a file finished downloading.
Test Plan:
Run tests:
./run-tests.py -l test-lfs*
....
# Ran 4 tests, 0 skipped, 0 failed.
Run commands resulting in lfs prefetch e.g. hg sparse --enable-profile
Differential Revision: https://phab.mercurial-scm.org/D1568
author | Wojciech Lis <wlis@fb.com> |
---|---|
date | Mon, 11 Dec 2017 17:02:02 -0800 |
parents | 86b8cc1f244e |
children | 71446ca85813 |
files | hgext/lfs/blobstore.py tests/test-lfs-test-server.t |
diffstat | 2 files changed, 45 insertions(+), 34 deletions(-) [+] |
line wrap: on
line diff
--- a/hgext/lfs/blobstore.py Thu Nov 30 16:01:53 2017 -0800 +++ b/hgext/lfs/blobstore.py Mon Dec 11 17:02:02 2017 -0800 @@ -19,6 +19,7 @@ url as urlmod, util, vfs as vfsmod, + worker, ) from ..largefiles import lfutil @@ -205,7 +206,7 @@ return filteredobjects - def _basictransfer(self, obj, action, localstore, progress=None): + def _basictransfer(self, obj, action, localstore): """Download or upload a single object using basic transfer protocol obj: dict, an object description returned by batch API @@ -223,7 +224,7 @@ request = util.urlreq.request(href) if action == 'upload': # If uploading blobs, read data from local blobstore. - request.data = filewithprogress(localstore.vfs(oid), progress) + request.data = filewithprogress(localstore.vfs(oid), None) request.get_method = lambda: 'PUT' for k, v in headers: @@ -236,8 +237,6 @@ data = req.read(1048576) if not data: break - if action == 'download' and progress: - progress(len(data)) response += data except util.urlerr.httperror as ex: raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)') @@ -252,45 +251,51 @@ raise error.ProgrammingError('invalid Git-LFS action: %s' % action) response = self._batchrequest(pointers, action) - prunningsize = [0] objects = self._extractobjects(response, pointers, action) total = sum(x.get('size', 0) for x in objects) + sizes = {} + for obj in objects: + sizes[obj.get('oid')] = obj.get('size', 0) topic = {'upload': _('lfs uploading'), 'download': _('lfs downloading')}[action] if self.ui.verbose and len(objects) > 1: self.ui.write(_('lfs: need to transfer %d objects (%s)\n') % (len(objects), util.bytecount(total))) self.ui.progress(topic, 0, total=total) - def progress(size): - # advance progress bar by "size" bytes - prunningsize[0] += size - self.ui.progress(topic, prunningsize[0], total=total) - for obj in sorted(objects, key=lambda o: o.get('oid')): - objsize = obj.get('size', 0) + def transfer(chunk): + for obj in chunk: + objsize = obj.get('size', 0) + if self.ui.verbose: + if action == 'download': + msg = _('lfs: downloading %s (%s)\n') + elif action == 'upload': + msg = _('lfs: uploading %s (%s)\n') + self.ui.write(msg % (obj.get('oid'), + util.bytecount(objsize))) + retry = self.retry + while True: + try: + self._basictransfer(obj, action, localstore) + yield 1, obj.get('oid') + break + except Exception as ex: + if retry > 0: + if self.ui.verbose: + self.ui.write( + _('lfs: failed: %r (remaining retry %d)\n') + % (ex, retry)) + retry -= 1 + continue + raise + + oids = worker.worker(self.ui, 0.1, transfer, (), + sorted(objects, key=lambda o: o.get('oid'))) + processed = 0 + for _one, oid in oids: + processed += sizes[oid] + self.ui.progress(topic, processed, total=total) if self.ui.verbose: - if action == 'download': - msg = _('lfs: downloading %s (%s)\n') - elif action == 'upload': - msg = _('lfs: uploading %s (%s)\n') - self.ui.write(msg % (obj.get('oid'), util.bytecount(objsize))) - origrunningsize = prunningsize[0] - retry = self.retry - while True: - prunningsize[0] = origrunningsize - try: - self._basictransfer(obj, action, localstore, - progress=progress) - break - except Exception as ex: - if retry > 0: - if self.ui.verbose: - self.ui.write( - _('lfs: failed: %r (remaining retry %d)\n') - % (ex, retry)) - retry -= 1 - continue - raise - + self.ui.write(_('lfs: processed: %s\n') % oid) self.ui.progress(topic, pos=None, total=total) def __del__(self):
--- a/tests/test-lfs-test-server.t Thu Nov 30 16:01:53 2017 -0800 +++ b/tests/test-lfs-test-server.t Mon Dec 11 17:02:02 2017 -0800 @@ -43,6 +43,7 @@ pushing to ../repo2 searching for changes lfs: uploading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes) + lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b 1 changesets found uncompressed size of bundle content: * (changelog) (glob) @@ -60,6 +61,7 @@ resolving manifests getting a lfs: downloading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes) + lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b 1 files updated, 0 files merged, 0 files removed, 0 files unresolved When the server has some blobs already @@ -73,7 +75,9 @@ searching for changes lfs: need to transfer 2 objects (39 bytes) lfs: uploading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes) + lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 lfs: uploading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes) + lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 1 changesets found uncompressed size of bundle content: adding changesets @@ -88,8 +92,10 @@ getting b getting c lfs: downloading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes) + lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 getting d lfs: downloading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes) + lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 3 files updated, 0 files merged, 0 files removed, 0 files unresolved Check error message when the remote missed a blob: