Mercurial > hg
comparison hgext/lfs/blobstore.py @ 35433:f98fac24b757
lfs: using workers in lfs prefetch
This significantly speeds up lfs prefetch. With fast network we are
seeing ~50% improvement of overall prefetch times
Because of worker's API in posix we do lose finegrained progress update and only
see progress when a file finished downloading.
Test Plan:
Run tests:
./run-tests.py -l test-lfs*
....
# Ran 4 tests, 0 skipped, 0 failed.
Run commands resulting in lfs prefetch e.g. hg sparse --enable-profile
Differential Revision: https://phab.mercurial-scm.org/D1568
author | Wojciech Lis <wlis@fb.com> |
---|---|
date | Mon, 11 Dec 2017 17:02:02 -0800 |
parents | c8edeb03ca94 |
children | e7bb5fc4570c |
comparison
equal
deleted
inserted
replaced
35432:86b8cc1f244e | 35433:f98fac24b757 |
---|---|
17 error, | 17 error, |
18 pathutil, | 18 pathutil, |
19 url as urlmod, | 19 url as urlmod, |
20 util, | 20 util, |
21 vfs as vfsmod, | 21 vfs as vfsmod, |
22 worker, | |
22 ) | 23 ) |
23 | 24 |
24 from ..largefiles import lfutil | 25 from ..largefiles import lfutil |
25 | 26 |
26 # 64 bytes for SHA256 | 27 # 64 bytes for SHA256 |
203 _('LFS server claims required objects do not exist:\n%s') | 204 _('LFS server claims required objects do not exist:\n%s') |
204 % '\n'.join(missing)) | 205 % '\n'.join(missing)) |
205 | 206 |
206 return filteredobjects | 207 return filteredobjects |
207 | 208 |
208 def _basictransfer(self, obj, action, localstore, progress=None): | 209 def _basictransfer(self, obj, action, localstore): |
209 """Download or upload a single object using basic transfer protocol | 210 """Download or upload a single object using basic transfer protocol |
210 | 211 |
211 obj: dict, an object description returned by batch API | 212 obj: dict, an object description returned by batch API |
212 action: string, one of ['upload', 'download'] | 213 action: string, one of ['upload', 'download'] |
213 localstore: blobstore.local | 214 localstore: blobstore.local |
221 headers = obj['actions'][action].get('header', {}).items() | 222 headers = obj['actions'][action].get('header', {}).items() |
222 | 223 |
223 request = util.urlreq.request(href) | 224 request = util.urlreq.request(href) |
224 if action == 'upload': | 225 if action == 'upload': |
225 # If uploading blobs, read data from local blobstore. | 226 # If uploading blobs, read data from local blobstore. |
226 request.data = filewithprogress(localstore.vfs(oid), progress) | 227 request.data = filewithprogress(localstore.vfs(oid), None) |
227 request.get_method = lambda: 'PUT' | 228 request.get_method = lambda: 'PUT' |
228 | 229 |
229 for k, v in headers: | 230 for k, v in headers: |
230 request.add_header(k, v) | 231 request.add_header(k, v) |
231 | 232 |
234 req = self.urlopener.open(request) | 235 req = self.urlopener.open(request) |
235 while True: | 236 while True: |
236 data = req.read(1048576) | 237 data = req.read(1048576) |
237 if not data: | 238 if not data: |
238 break | 239 break |
239 if action == 'download' and progress: | |
240 progress(len(data)) | |
241 response += data | 240 response += data |
242 except util.urlerr.httperror as ex: | 241 except util.urlerr.httperror as ex: |
243 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)') | 242 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)') |
244 % (ex, oid, action)) | 243 % (ex, oid, action)) |
245 | 244 |
250 def _batch(self, pointers, localstore, action): | 249 def _batch(self, pointers, localstore, action): |
251 if action not in ['upload', 'download']: | 250 if action not in ['upload', 'download']: |
252 raise error.ProgrammingError('invalid Git-LFS action: %s' % action) | 251 raise error.ProgrammingError('invalid Git-LFS action: %s' % action) |
253 | 252 |
254 response = self._batchrequest(pointers, action) | 253 response = self._batchrequest(pointers, action) |
255 prunningsize = [0] | |
256 objects = self._extractobjects(response, pointers, action) | 254 objects = self._extractobjects(response, pointers, action) |
257 total = sum(x.get('size', 0) for x in objects) | 255 total = sum(x.get('size', 0) for x in objects) |
256 sizes = {} | |
257 for obj in objects: | |
258 sizes[obj.get('oid')] = obj.get('size', 0) | |
258 topic = {'upload': _('lfs uploading'), | 259 topic = {'upload': _('lfs uploading'), |
259 'download': _('lfs downloading')}[action] | 260 'download': _('lfs downloading')}[action] |
260 if self.ui.verbose and len(objects) > 1: | 261 if self.ui.verbose and len(objects) > 1: |
261 self.ui.write(_('lfs: need to transfer %d objects (%s)\n') | 262 self.ui.write(_('lfs: need to transfer %d objects (%s)\n') |
262 % (len(objects), util.bytecount(total))) | 263 % (len(objects), util.bytecount(total))) |
263 self.ui.progress(topic, 0, total=total) | 264 self.ui.progress(topic, 0, total=total) |
264 def progress(size): | 265 def transfer(chunk): |
265 # advance progress bar by "size" bytes | 266 for obj in chunk: |
266 prunningsize[0] += size | 267 objsize = obj.get('size', 0) |
267 self.ui.progress(topic, prunningsize[0], total=total) | 268 if self.ui.verbose: |
268 for obj in sorted(objects, key=lambda o: o.get('oid')): | 269 if action == 'download': |
269 objsize = obj.get('size', 0) | 270 msg = _('lfs: downloading %s (%s)\n') |
271 elif action == 'upload': | |
272 msg = _('lfs: uploading %s (%s)\n') | |
273 self.ui.write(msg % (obj.get('oid'), | |
274 util.bytecount(objsize))) | |
275 retry = self.retry | |
276 while True: | |
277 try: | |
278 self._basictransfer(obj, action, localstore) | |
279 yield 1, obj.get('oid') | |
280 break | |
281 except Exception as ex: | |
282 if retry > 0: | |
283 if self.ui.verbose: | |
284 self.ui.write( | |
285 _('lfs: failed: %r (remaining retry %d)\n') | |
286 % (ex, retry)) | |
287 retry -= 1 | |
288 continue | |
289 raise | |
290 | |
291 oids = worker.worker(self.ui, 0.1, transfer, (), | |
292 sorted(objects, key=lambda o: o.get('oid'))) | |
293 processed = 0 | |
294 for _one, oid in oids: | |
295 processed += sizes[oid] | |
296 self.ui.progress(topic, processed, total=total) | |
270 if self.ui.verbose: | 297 if self.ui.verbose: |
271 if action == 'download': | 298 self.ui.write(_('lfs: processed: %s\n') % oid) |
272 msg = _('lfs: downloading %s (%s)\n') | |
273 elif action == 'upload': | |
274 msg = _('lfs: uploading %s (%s)\n') | |
275 self.ui.write(msg % (obj.get('oid'), util.bytecount(objsize))) | |
276 origrunningsize = prunningsize[0] | |
277 retry = self.retry | |
278 while True: | |
279 prunningsize[0] = origrunningsize | |
280 try: | |
281 self._basictransfer(obj, action, localstore, | |
282 progress=progress) | |
283 break | |
284 except Exception as ex: | |
285 if retry > 0: | |
286 if self.ui.verbose: | |
287 self.ui.write( | |
288 _('lfs: failed: %r (remaining retry %d)\n') | |
289 % (ex, retry)) | |
290 retry -= 1 | |
291 continue | |
292 raise | |
293 | |
294 self.ui.progress(topic, pos=None, total=total) | 299 self.ui.progress(topic, pos=None, total=total) |
295 | 300 |
296 def __del__(self): | 301 def __del__(self): |
297 # copied from mercurial/httppeer.py | 302 # copied from mercurial/httppeer.py |
298 urlopener = getattr(self, 'urlopener', None) | 303 urlopener = getattr(self, 'urlopener', None) |