lfs: using workers in lfs prefetch
This significantly speeds up lfs prefetch. With fast network we are
seeing ~50% improvement of overall prefetch times
Because of worker's API in posix we do lose finegrained progress update and only
see progress when a file finished downloading.
Test Plan:
Run tests:
./run-tests.py -l test-lfs*
....
# Ran 4 tests, 0 skipped, 0 failed.
Run commands resulting in lfs prefetch e.g. hg sparse --enable-profile
Differential Revision: https://phab.mercurial-scm.org/D1568
--- a/hgext/lfs/blobstore.py Thu Nov 30 16:01:53 2017 -0800
+++ b/hgext/lfs/blobstore.py Mon Dec 11 17:02:02 2017 -0800
@@ -19,6 +19,7 @@
url as urlmod,
util,
vfs as vfsmod,
+ worker,
)
from ..largefiles import lfutil
@@ -205,7 +206,7 @@
return filteredobjects
- def _basictransfer(self, obj, action, localstore, progress=None):
+ def _basictransfer(self, obj, action, localstore):
"""Download or upload a single object using basic transfer protocol
obj: dict, an object description returned by batch API
@@ -223,7 +224,7 @@
request = util.urlreq.request(href)
if action == 'upload':
# If uploading blobs, read data from local blobstore.
- request.data = filewithprogress(localstore.vfs(oid), progress)
+ request.data = filewithprogress(localstore.vfs(oid), None)
request.get_method = lambda: 'PUT'
for k, v in headers:
@@ -236,8 +237,6 @@
data = req.read(1048576)
if not data:
break
- if action == 'download' and progress:
- progress(len(data))
response += data
except util.urlerr.httperror as ex:
raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
@@ -252,45 +251,51 @@
raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
response = self._batchrequest(pointers, action)
- prunningsize = [0]
objects = self._extractobjects(response, pointers, action)
total = sum(x.get('size', 0) for x in objects)
+ sizes = {}
+ for obj in objects:
+ sizes[obj.get('oid')] = obj.get('size', 0)
topic = {'upload': _('lfs uploading'),
'download': _('lfs downloading')}[action]
if self.ui.verbose and len(objects) > 1:
self.ui.write(_('lfs: need to transfer %d objects (%s)\n')
% (len(objects), util.bytecount(total)))
self.ui.progress(topic, 0, total=total)
- def progress(size):
- # advance progress bar by "size" bytes
- prunningsize[0] += size
- self.ui.progress(topic, prunningsize[0], total=total)
- for obj in sorted(objects, key=lambda o: o.get('oid')):
- objsize = obj.get('size', 0)
+ def transfer(chunk):
+ for obj in chunk:
+ objsize = obj.get('size', 0)
+ if self.ui.verbose:
+ if action == 'download':
+ msg = _('lfs: downloading %s (%s)\n')
+ elif action == 'upload':
+ msg = _('lfs: uploading %s (%s)\n')
+ self.ui.write(msg % (obj.get('oid'),
+ util.bytecount(objsize)))
+ retry = self.retry
+ while True:
+ try:
+ self._basictransfer(obj, action, localstore)
+ yield 1, obj.get('oid')
+ break
+ except Exception as ex:
+ if retry > 0:
+ if self.ui.verbose:
+ self.ui.write(
+ _('lfs: failed: %r (remaining retry %d)\n')
+ % (ex, retry))
+ retry -= 1
+ continue
+ raise
+
+ oids = worker.worker(self.ui, 0.1, transfer, (),
+ sorted(objects, key=lambda o: o.get('oid')))
+ processed = 0
+ for _one, oid in oids:
+ processed += sizes[oid]
+ self.ui.progress(topic, processed, total=total)
if self.ui.verbose:
- if action == 'download':
- msg = _('lfs: downloading %s (%s)\n')
- elif action == 'upload':
- msg = _('lfs: uploading %s (%s)\n')
- self.ui.write(msg % (obj.get('oid'), util.bytecount(objsize)))
- origrunningsize = prunningsize[0]
- retry = self.retry
- while True:
- prunningsize[0] = origrunningsize
- try:
- self._basictransfer(obj, action, localstore,
- progress=progress)
- break
- except Exception as ex:
- if retry > 0:
- if self.ui.verbose:
- self.ui.write(
- _('lfs: failed: %r (remaining retry %d)\n')
- % (ex, retry))
- retry -= 1
- continue
- raise
-
+ self.ui.write(_('lfs: processed: %s\n') % oid)
self.ui.progress(topic, pos=None, total=total)
def __del__(self):
--- a/tests/test-lfs-test-server.t Thu Nov 30 16:01:53 2017 -0800
+++ b/tests/test-lfs-test-server.t Mon Dec 11 17:02:02 2017 -0800
@@ -43,6 +43,7 @@
pushing to ../repo2
searching for changes
lfs: uploading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes)
+ lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b
1 changesets found
uncompressed size of bundle content:
* (changelog) (glob)
@@ -60,6 +61,7 @@
resolving manifests
getting a
lfs: downloading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes)
+ lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b
1 files updated, 0 files merged, 0 files removed, 0 files unresolved
When the server has some blobs already
@@ -73,7 +75,9 @@
searching for changes
lfs: need to transfer 2 objects (39 bytes)
lfs: uploading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes)
+ lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19
lfs: uploading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes)
+ lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998
1 changesets found
uncompressed size of bundle content:
adding changesets
@@ -88,8 +92,10 @@
getting b
getting c
lfs: downloading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes)
+ lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998
getting d
lfs: downloading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes)
+ lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19
3 files updated, 0 files merged, 0 files removed, 0 files unresolved
Check error message when the remote missed a blob: