changeset 35433:f98fac24b757

lfs: using workers in lfs prefetch This significantly speeds up lfs prefetch. With fast network we are seeing ~50% improvement of overall prefetch times Because of worker's API in posix we do lose finegrained progress update and only see progress when a file finished downloading. Test Plan: Run tests: ./run-tests.py -l test-lfs* .... # Ran 4 tests, 0 skipped, 0 failed. Run commands resulting in lfs prefetch e.g. hg sparse --enable-profile Differential Revision: https://phab.mercurial-scm.org/D1568
author Wojciech Lis <wlis@fb.com>
date Mon, 11 Dec 2017 17:02:02 -0800
parents 86b8cc1f244e
children 71446ca85813
files hgext/lfs/blobstore.py tests/test-lfs-test-server.t
diffstat 2 files changed, 45 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/hgext/lfs/blobstore.py	Thu Nov 30 16:01:53 2017 -0800
+++ b/hgext/lfs/blobstore.py	Mon Dec 11 17:02:02 2017 -0800
@@ -19,6 +19,7 @@
     url as urlmod,
     util,
     vfs as vfsmod,
+    worker,
 )
 
 from ..largefiles import lfutil
@@ -205,7 +206,7 @@
 
         return filteredobjects
 
-    def _basictransfer(self, obj, action, localstore, progress=None):
+    def _basictransfer(self, obj, action, localstore):
         """Download or upload a single object using basic transfer protocol
 
         obj: dict, an object description returned by batch API
@@ -223,7 +224,7 @@
         request = util.urlreq.request(href)
         if action == 'upload':
             # If uploading blobs, read data from local blobstore.
-            request.data = filewithprogress(localstore.vfs(oid), progress)
+            request.data = filewithprogress(localstore.vfs(oid), None)
             request.get_method = lambda: 'PUT'
 
         for k, v in headers:
@@ -236,8 +237,6 @@
                 data = req.read(1048576)
                 if not data:
                     break
-                if action == 'download' and progress:
-                    progress(len(data))
                 response += data
         except util.urlerr.httperror as ex:
             raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
@@ -252,45 +251,51 @@
             raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
 
         response = self._batchrequest(pointers, action)
-        prunningsize = [0]
         objects = self._extractobjects(response, pointers, action)
         total = sum(x.get('size', 0) for x in objects)
+        sizes = {}
+        for obj in objects:
+            sizes[obj.get('oid')] = obj.get('size', 0)
         topic = {'upload': _('lfs uploading'),
                  'download': _('lfs downloading')}[action]
         if self.ui.verbose and len(objects) > 1:
             self.ui.write(_('lfs: need to transfer %d objects (%s)\n')
                           % (len(objects), util.bytecount(total)))
         self.ui.progress(topic, 0, total=total)
-        def progress(size):
-            # advance progress bar by "size" bytes
-            prunningsize[0] += size
-            self.ui.progress(topic, prunningsize[0], total=total)
-        for obj in sorted(objects, key=lambda o: o.get('oid')):
-            objsize = obj.get('size', 0)
+        def transfer(chunk):
+            for obj in chunk:
+                objsize = obj.get('size', 0)
+                if self.ui.verbose:
+                    if action == 'download':
+                        msg = _('lfs: downloading %s (%s)\n')
+                    elif action == 'upload':
+                        msg = _('lfs: uploading %s (%s)\n')
+                    self.ui.write(msg % (obj.get('oid'),
+                                  util.bytecount(objsize)))
+                retry = self.retry
+                while True:
+                    try:
+                        self._basictransfer(obj, action, localstore)
+                        yield 1, obj.get('oid')
+                        break
+                    except Exception as ex:
+                        if retry > 0:
+                            if self.ui.verbose:
+                                self.ui.write(
+                                    _('lfs: failed: %r (remaining retry %d)\n')
+                                    % (ex, retry))
+                            retry -= 1
+                            continue
+                        raise
+
+        oids = worker.worker(self.ui, 0.1, transfer, (),
+                             sorted(objects, key=lambda o: o.get('oid')))
+        processed = 0
+        for _one, oid in oids:
+            processed += sizes[oid]
+            self.ui.progress(topic, processed, total=total)
             if self.ui.verbose:
-                if action == 'download':
-                    msg = _('lfs: downloading %s (%s)\n')
-                elif action == 'upload':
-                    msg = _('lfs: uploading %s (%s)\n')
-                self.ui.write(msg % (obj.get('oid'), util.bytecount(objsize)))
-            origrunningsize = prunningsize[0]
-            retry = self.retry
-            while True:
-                prunningsize[0] = origrunningsize
-                try:
-                    self._basictransfer(obj, action, localstore,
-                                        progress=progress)
-                    break
-                except Exception as ex:
-                    if retry > 0:
-                        if self.ui.verbose:
-                            self.ui.write(
-                                _('lfs: failed: %r (remaining retry %d)\n')
-                                % (ex, retry))
-                        retry -= 1
-                        continue
-                    raise
-
+                self.ui.write(_('lfs: processed: %s\n') % oid)
         self.ui.progress(topic, pos=None, total=total)
 
     def __del__(self):
--- a/tests/test-lfs-test-server.t	Thu Nov 30 16:01:53 2017 -0800
+++ b/tests/test-lfs-test-server.t	Mon Dec 11 17:02:02 2017 -0800
@@ -43,6 +43,7 @@
   pushing to ../repo2
   searching for changes
   lfs: uploading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes)
+  lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b
   1 changesets found
   uncompressed size of bundle content:
        * (changelog) (glob)
@@ -60,6 +61,7 @@
   resolving manifests
   getting a
   lfs: downloading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes)
+  lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b
   1 files updated, 0 files merged, 0 files removed, 0 files unresolved
 
 When the server has some blobs already
@@ -73,7 +75,9 @@
   searching for changes
   lfs: need to transfer 2 objects (39 bytes)
   lfs: uploading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes)
+  lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19
   lfs: uploading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes)
+  lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998
   1 changesets found
   uncompressed size of bundle content:
   adding changesets
@@ -88,8 +92,10 @@
   getting b
   getting c
   lfs: downloading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes)
+  lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998
   getting d
   lfs: downloading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes)
+  lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19
   3 files updated, 0 files merged, 0 files removed, 0 files unresolved
 
 Check error message when the remote missed a blob: