comparison hgext/lfs/blobstore.py @ 35433:f98fac24b757

lfs: using workers in lfs prefetch This significantly speeds up lfs prefetch. With fast network we are seeing ~50% improvement of overall prefetch times Because of worker's API in posix we do lose finegrained progress update and only see progress when a file finished downloading. Test Plan: Run tests: ./run-tests.py -l test-lfs* .... # Ran 4 tests, 0 skipped, 0 failed. Run commands resulting in lfs prefetch e.g. hg sparse --enable-profile Differential Revision: https://phab.mercurial-scm.org/D1568
author Wojciech Lis <wlis@fb.com>
date Mon, 11 Dec 2017 17:02:02 -0800
parents c8edeb03ca94
children e7bb5fc4570c
comparison
equal deleted inserted replaced
35432:86b8cc1f244e 35433:f98fac24b757
17 error, 17 error,
18 pathutil, 18 pathutil,
19 url as urlmod, 19 url as urlmod,
20 util, 20 util,
21 vfs as vfsmod, 21 vfs as vfsmod,
22 worker,
22 ) 23 )
23 24
24 from ..largefiles import lfutil 25 from ..largefiles import lfutil
25 26
26 # 64 bytes for SHA256 27 # 64 bytes for SHA256
203 _('LFS server claims required objects do not exist:\n%s') 204 _('LFS server claims required objects do not exist:\n%s')
204 % '\n'.join(missing)) 205 % '\n'.join(missing))
205 206
206 return filteredobjects 207 return filteredobjects
207 208
208 def _basictransfer(self, obj, action, localstore, progress=None): 209 def _basictransfer(self, obj, action, localstore):
209 """Download or upload a single object using basic transfer protocol 210 """Download or upload a single object using basic transfer protocol
210 211
211 obj: dict, an object description returned by batch API 212 obj: dict, an object description returned by batch API
212 action: string, one of ['upload', 'download'] 213 action: string, one of ['upload', 'download']
213 localstore: blobstore.local 214 localstore: blobstore.local
221 headers = obj['actions'][action].get('header', {}).items() 222 headers = obj['actions'][action].get('header', {}).items()
222 223
223 request = util.urlreq.request(href) 224 request = util.urlreq.request(href)
224 if action == 'upload': 225 if action == 'upload':
225 # If uploading blobs, read data from local blobstore. 226 # If uploading blobs, read data from local blobstore.
226 request.data = filewithprogress(localstore.vfs(oid), progress) 227 request.data = filewithprogress(localstore.vfs(oid), None)
227 request.get_method = lambda: 'PUT' 228 request.get_method = lambda: 'PUT'
228 229
229 for k, v in headers: 230 for k, v in headers:
230 request.add_header(k, v) 231 request.add_header(k, v)
231 232
234 req = self.urlopener.open(request) 235 req = self.urlopener.open(request)
235 while True: 236 while True:
236 data = req.read(1048576) 237 data = req.read(1048576)
237 if not data: 238 if not data:
238 break 239 break
239 if action == 'download' and progress:
240 progress(len(data))
241 response += data 240 response += data
242 except util.urlerr.httperror as ex: 241 except util.urlerr.httperror as ex:
243 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)') 242 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
244 % (ex, oid, action)) 243 % (ex, oid, action))
245 244
250 def _batch(self, pointers, localstore, action): 249 def _batch(self, pointers, localstore, action):
251 if action not in ['upload', 'download']: 250 if action not in ['upload', 'download']:
252 raise error.ProgrammingError('invalid Git-LFS action: %s' % action) 251 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
253 252
254 response = self._batchrequest(pointers, action) 253 response = self._batchrequest(pointers, action)
255 prunningsize = [0]
256 objects = self._extractobjects(response, pointers, action) 254 objects = self._extractobjects(response, pointers, action)
257 total = sum(x.get('size', 0) for x in objects) 255 total = sum(x.get('size', 0) for x in objects)
256 sizes = {}
257 for obj in objects:
258 sizes[obj.get('oid')] = obj.get('size', 0)
258 topic = {'upload': _('lfs uploading'), 259 topic = {'upload': _('lfs uploading'),
259 'download': _('lfs downloading')}[action] 260 'download': _('lfs downloading')}[action]
260 if self.ui.verbose and len(objects) > 1: 261 if self.ui.verbose and len(objects) > 1:
261 self.ui.write(_('lfs: need to transfer %d objects (%s)\n') 262 self.ui.write(_('lfs: need to transfer %d objects (%s)\n')
262 % (len(objects), util.bytecount(total))) 263 % (len(objects), util.bytecount(total)))
263 self.ui.progress(topic, 0, total=total) 264 self.ui.progress(topic, 0, total=total)
264 def progress(size): 265 def transfer(chunk):
265 # advance progress bar by "size" bytes 266 for obj in chunk:
266 prunningsize[0] += size 267 objsize = obj.get('size', 0)
267 self.ui.progress(topic, prunningsize[0], total=total) 268 if self.ui.verbose:
268 for obj in sorted(objects, key=lambda o: o.get('oid')): 269 if action == 'download':
269 objsize = obj.get('size', 0) 270 msg = _('lfs: downloading %s (%s)\n')
271 elif action == 'upload':
272 msg = _('lfs: uploading %s (%s)\n')
273 self.ui.write(msg % (obj.get('oid'),
274 util.bytecount(objsize)))
275 retry = self.retry
276 while True:
277 try:
278 self._basictransfer(obj, action, localstore)
279 yield 1, obj.get('oid')
280 break
281 except Exception as ex:
282 if retry > 0:
283 if self.ui.verbose:
284 self.ui.write(
285 _('lfs: failed: %r (remaining retry %d)\n')
286 % (ex, retry))
287 retry -= 1
288 continue
289 raise
290
291 oids = worker.worker(self.ui, 0.1, transfer, (),
292 sorted(objects, key=lambda o: o.get('oid')))
293 processed = 0
294 for _one, oid in oids:
295 processed += sizes[oid]
296 self.ui.progress(topic, processed, total=total)
270 if self.ui.verbose: 297 if self.ui.verbose:
271 if action == 'download': 298 self.ui.write(_('lfs: processed: %s\n') % oid)
272 msg = _('lfs: downloading %s (%s)\n')
273 elif action == 'upload':
274 msg = _('lfs: uploading %s (%s)\n')
275 self.ui.write(msg % (obj.get('oid'), util.bytecount(objsize)))
276 origrunningsize = prunningsize[0]
277 retry = self.retry
278 while True:
279 prunningsize[0] = origrunningsize
280 try:
281 self._basictransfer(obj, action, localstore,
282 progress=progress)
283 break
284 except Exception as ex:
285 if retry > 0:
286 if self.ui.verbose:
287 self.ui.write(
288 _('lfs: failed: %r (remaining retry %d)\n')
289 % (ex, retry))
290 retry -= 1
291 continue
292 raise
293
294 self.ui.progress(topic, pos=None, total=total) 299 self.ui.progress(topic, pos=None, total=total)
295 300
296 def __del__(self): 301 def __del__(self):
297 # copied from mercurial/httppeer.py 302 # copied from mercurial/httppeer.py
298 urlopener = getattr(self, 'urlopener', None) 303 urlopener = getattr(self, 'urlopener', None)