Mercurial > hg
comparison hgext/remotefilelog/fileserverclient.py @ 43076:2372284d9457
formatting: blacken the codebase
This is using my patch to black
(https://github.com/psf/black/pull/826) so we don't un-wrap collection
literals.
Done with:
hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S
# skip-blame mass-reformatting only
# no-check-commit reformats foo_bar functions
Differential Revision: https://phab.mercurial-scm.org/D6971
author | Augie Fackler <augie@google.com> |
---|---|
date | Sun, 06 Oct 2019 09:45:02 -0400 |
parents | fdeb4c1d23d5 |
children | 687b865b95ad |
comparison
equal
deleted
inserted
replaced
43075:57875cf423c9 | 43076:2372284d9457 |
---|---|
41 fetched = 0 | 41 fetched = 0 |
42 fetchmisses = 0 | 42 fetchmisses = 0 |
43 | 43 |
44 _lfsmod = None | 44 _lfsmod = None |
45 | 45 |
46 | |
46 def getcachekey(reponame, file, id): | 47 def getcachekey(reponame, file, id): |
47 pathhash = node.hex(hashlib.sha1(file).digest()) | 48 pathhash = node.hex(hashlib.sha1(file).digest()) |
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id) | 49 return os.path.join(reponame, pathhash[:2], pathhash[2:], id) |
49 | 50 |
51 | |
50 def getlocalkey(file, id): | 52 def getlocalkey(file, id): |
51 pathhash = node.hex(hashlib.sha1(file).digest()) | 53 pathhash = node.hex(hashlib.sha1(file).digest()) |
52 return os.path.join(pathhash, id) | 54 return os.path.join(pathhash, id) |
53 | 55 |
56 | |
54 def peersetup(ui, peer): | 57 def peersetup(ui, peer): |
55 | |
56 class remotefilepeer(peer.__class__): | 58 class remotefilepeer(peer.__class__): |
57 @wireprotov1peer.batchable | 59 @wireprotov1peer.batchable |
58 def x_rfl_getfile(self, file, node): | 60 def x_rfl_getfile(self, file, node): |
59 if not self.capable('x_rfl_getfile'): | 61 if not self.capable('x_rfl_getfile'): |
60 raise error.Abort( | 62 raise error.Abort( |
61 'configured remotefile server does not support getfile') | 63 'configured remotefile server does not support getfile' |
64 ) | |
62 f = wireprotov1peer.future() | 65 f = wireprotov1peer.future() |
63 yield {'file': file, 'node': node}, f | 66 yield {'file': file, 'node': node}, f |
64 code, data = f.value.split('\0', 1) | 67 code, data = f.value.split('\0', 1) |
65 if int(code): | 68 if int(code): |
66 raise error.LookupError(file, node, data) | 69 raise error.LookupError(file, node, data) |
67 yield data | 70 yield data |
68 | 71 |
69 @wireprotov1peer.batchable | 72 @wireprotov1peer.batchable |
70 def x_rfl_getflogheads(self, path): | 73 def x_rfl_getflogheads(self, path): |
71 if not self.capable('x_rfl_getflogheads'): | 74 if not self.capable('x_rfl_getflogheads'): |
72 raise error.Abort('configured remotefile server does not ' | 75 raise error.Abort( |
73 'support getflogheads') | 76 'configured remotefile server does not ' |
77 'support getflogheads' | |
78 ) | |
74 f = wireprotov1peer.future() | 79 f = wireprotov1peer.future() |
75 yield {'path': path}, f | 80 yield {'path': path}, f |
76 heads = f.value.split('\n') if f.value else [] | 81 heads = f.value.split('\n') if f.value else [] |
77 yield heads | 82 yield heads |
78 | 83 |
79 def _updatecallstreamopts(self, command, opts): | 84 def _updatecallstreamopts(self, command, opts): |
80 if command != 'getbundle': | 85 if command != 'getbundle': |
81 return | 86 return |
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES | 87 if ( |
83 not in self.capabilities()): | 88 constants.NETWORK_CAP_LEGACY_SSH_GETFILES |
89 not in self.capabilities() | |
90 ): | |
84 return | 91 return |
85 if not util.safehasattr(self, '_localrepo'): | 92 if not util.safehasattr(self, '_localrepo'): |
86 return | 93 return |
87 if (constants.SHALLOWREPO_REQUIREMENT | 94 if ( |
88 not in self._localrepo.requirements): | 95 constants.SHALLOWREPO_REQUIREMENT |
96 not in self._localrepo.requirements | |
97 ): | |
89 return | 98 return |
90 | 99 |
91 bundlecaps = opts.get('bundlecaps') | 100 bundlecaps = opts.get('bundlecaps') |
92 if bundlecaps: | 101 if bundlecaps: |
93 bundlecaps = [bundlecaps] | 102 bundlecaps = [bundlecaps] |
112 bundlecaps.append(excludecap) | 121 bundlecaps.append(excludecap) |
113 opts['bundlecaps'] = ','.join(bundlecaps) | 122 opts['bundlecaps'] = ','.join(bundlecaps) |
114 | 123 |
115 def _sendrequest(self, command, args, **opts): | 124 def _sendrequest(self, command, args, **opts): |
116 self._updatecallstreamopts(command, args) | 125 self._updatecallstreamopts(command, args) |
117 return super(remotefilepeer, self)._sendrequest(command, args, | 126 return super(remotefilepeer, self)._sendrequest( |
118 **opts) | 127 command, args, **opts |
128 ) | |
119 | 129 |
120 def _callstream(self, command, **opts): | 130 def _callstream(self, command, **opts): |
121 supertype = super(remotefilepeer, self) | 131 supertype = super(remotefilepeer, self) |
122 if not util.safehasattr(supertype, '_sendrequest'): | 132 if not util.safehasattr(supertype, '_sendrequest'): |
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts)) | 133 self._updatecallstreamopts(command, pycompat.byteskwargs(opts)) |
124 return super(remotefilepeer, self)._callstream(command, **opts) | 134 return super(remotefilepeer, self)._callstream(command, **opts) |
125 | 135 |
126 peer.__class__ = remotefilepeer | 136 peer.__class__ = remotefilepeer |
127 | 137 |
138 | |
128 class cacheconnection(object): | 139 class cacheconnection(object): |
129 """The connection for communicating with the remote cache. Performs | 140 """The connection for communicating with the remote cache. Performs |
130 gets and sets by communicating with an external process that has the | 141 gets and sets by communicating with an external process that has the |
131 cache-specific implementation. | 142 cache-specific implementation. |
132 """ | 143 """ |
144 | |
133 def __init__(self): | 145 def __init__(self): |
134 self.pipeo = self.pipei = self.pipee = None | 146 self.pipeo = self.pipei = self.pipee = None |
135 self.subprocess = None | 147 self.subprocess = None |
136 self.connected = False | 148 self.connected = False |
137 | 149 |
138 def connect(self, cachecommand): | 150 def connect(self, cachecommand): |
139 if self.pipeo: | 151 if self.pipeo: |
140 raise error.Abort(_("cache connection already open")) | 152 raise error.Abort(_("cache connection already open")) |
141 self.pipei, self.pipeo, self.pipee, self.subprocess = ( | 153 self.pipei, self.pipeo, self.pipee, self.subprocess = procutil.popen4( |
142 procutil.popen4(cachecommand)) | 154 cachecommand |
155 ) | |
143 self.connected = True | 156 self.connected = True |
144 | 157 |
145 def close(self): | 158 def close(self): |
146 def tryclose(pipe): | 159 def tryclose(pipe): |
147 try: | 160 try: |
148 pipe.close() | 161 pipe.close() |
149 except Exception: | 162 except Exception: |
150 pass | 163 pass |
164 | |
151 if self.connected: | 165 if self.connected: |
152 try: | 166 try: |
153 self.pipei.write("exit\n") | 167 self.pipei.write("exit\n") |
154 except Exception: | 168 except Exception: |
155 pass | 169 pass |
188 except IOError: | 202 except IOError: |
189 self.close() | 203 self.close() |
190 | 204 |
191 return result | 205 return result |
192 | 206 |
207 | |
193 def _getfilesbatch( | 208 def _getfilesbatch( |
194 remote, receivemissing, progresstick, missed, idmap, batchsize): | 209 remote, receivemissing, progresstick, missed, idmap, batchsize |
210 ): | |
195 # Over http(s), iterbatch is a streamy method and we can start | 211 # Over http(s), iterbatch is a streamy method and we can start |
196 # looking at results early. This means we send one (potentially | 212 # looking at results early. This means we send one (potentially |
197 # large) request, but then we show nice progress as we process | 213 # large) request, but then we show nice progress as we process |
198 # file results, rather than showing chunks of $batchsize in | 214 # file results, rather than showing chunks of $batchsize in |
199 # progress. | 215 # progress. |
203 # should probably introduce a streambatch() method upstream and | 219 # should probably introduce a streambatch() method upstream and |
204 # use that for this. | 220 # use that for this. |
205 with remote.commandexecutor() as e: | 221 with remote.commandexecutor() as e: |
206 futures = [] | 222 futures = [] |
207 for m in missed: | 223 for m in missed: |
208 futures.append(e.callcommand('x_rfl_getfile', { | 224 futures.append( |
209 'file': idmap[m], | 225 e.callcommand( |
210 'node': m[-40:] | 226 'x_rfl_getfile', {'file': idmap[m], 'node': m[-40:]} |
211 })) | 227 ) |
228 ) | |
212 | 229 |
213 for i, m in enumerate(missed): | 230 for i, m in enumerate(missed): |
214 r = futures[i].result() | 231 r = futures[i].result() |
215 futures[i] = None # release memory | 232 futures[i] = None # release memory |
216 file_ = idmap[m] | 233 file_ = idmap[m] |
217 node = m[-40:] | 234 node = m[-40:] |
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node) | 235 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node) |
219 progresstick() | 236 progresstick() |
220 | 237 |
238 | |
221 def _getfiles_optimistic( | 239 def _getfiles_optimistic( |
222 remote, receivemissing, progresstick, missed, idmap, step): | 240 remote, receivemissing, progresstick, missed, idmap, step |
241 ): | |
223 remote._callstream("x_rfl_getfiles") | 242 remote._callstream("x_rfl_getfiles") |
224 i = 0 | 243 i = 0 |
225 pipeo = remote._pipeo | 244 pipeo = remote._pipeo |
226 pipei = remote._pipei | 245 pipei = remote._pipei |
227 while i < len(missed): | 246 while i < len(missed): |
246 | 265 |
247 # End the command | 266 # End the command |
248 pipeo.write('\n') | 267 pipeo.write('\n') |
249 pipeo.flush() | 268 pipeo.flush() |
250 | 269 |
270 | |
251 def _getfiles_threaded( | 271 def _getfiles_threaded( |
252 remote, receivemissing, progresstick, missed, idmap, step): | 272 remote, receivemissing, progresstick, missed, idmap, step |
273 ): | |
253 remote._callstream("getfiles") | 274 remote._callstream("getfiles") |
254 pipeo = remote._pipeo | 275 pipeo = remote._pipeo |
255 pipei = remote._pipei | 276 pipei = remote._pipei |
256 | 277 |
257 def writer(): | 278 def writer(): |
259 versionid = missingid[-40:] | 280 versionid = missingid[-40:] |
260 file = idmap[missingid] | 281 file = idmap[missingid] |
261 sshrequest = "%s%s\n" % (versionid, file) | 282 sshrequest = "%s%s\n" % (versionid, file) |
262 pipeo.write(sshrequest) | 283 pipeo.write(sshrequest) |
263 pipeo.flush() | 284 pipeo.flush() |
285 | |
264 writerthread = threading.Thread(target=writer) | 286 writerthread = threading.Thread(target=writer) |
265 writerthread.daemon = True | 287 writerthread.daemon = True |
266 writerthread.start() | 288 writerthread.start() |
267 | 289 |
268 for missingid in missed: | 290 for missingid in missed: |
274 writerthread.join() | 296 writerthread.join() |
275 # End the command | 297 # End the command |
276 pipeo.write('\n') | 298 pipeo.write('\n') |
277 pipeo.flush() | 299 pipeo.flush() |
278 | 300 |
301 | |
279 class fileserverclient(object): | 302 class fileserverclient(object): |
280 """A client for requesting files from the remote file server. | 303 """A client for requesting files from the remote file server. |
281 """ | 304 """ |
305 | |
282 def __init__(self, repo): | 306 def __init__(self, repo): |
283 ui = repo.ui | 307 ui = repo.ui |
284 self.repo = repo | 308 self.repo = repo |
285 self.ui = ui | 309 self.ui = ui |
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess") | 310 self.cacheprocess = ui.config("remotefilelog", "cacheprocess") |
288 self.cacheprocess = util.expandpath(self.cacheprocess) | 312 self.cacheprocess = util.expandpath(self.cacheprocess) |
289 | 313 |
290 # This option causes remotefilelog to pass the full file path to the | 314 # This option causes remotefilelog to pass the full file path to the |
291 # cacheprocess instead of a hashed key. | 315 # cacheprocess instead of a hashed key. |
292 self.cacheprocesspasspath = ui.configbool( | 316 self.cacheprocesspasspath = ui.configbool( |
293 "remotefilelog", "cacheprocess.includepath") | 317 "remotefilelog", "cacheprocess.includepath" |
318 ) | |
294 | 319 |
295 self.debugoutput = ui.configbool("remotefilelog", "debug") | 320 self.debugoutput = ui.configbool("remotefilelog", "debug") |
296 | 321 |
297 self.remotecache = cacheconnection() | 322 self.remotecache = cacheconnection() |
298 | 323 |
339 if not missingid: | 364 if not missingid: |
340 missedset = set(missed) | 365 missedset = set(missed) |
341 for missingid in idmap: | 366 for missingid in idmap: |
342 if not missingid in missedset: | 367 if not missingid in missedset: |
343 missed.append(missingid) | 368 missed.append(missingid) |
344 self.ui.warn(_("warning: cache connection closed early - " + | 369 self.ui.warn( |
345 "falling back to server\n")) | 370 _( |
371 "warning: cache connection closed early - " | |
372 + "falling back to server\n" | |
373 ) | |
374 ) | |
346 break | 375 break |
347 if missingid == "0": | 376 if missingid == "0": |
348 break | 377 break |
349 if missingid.startswith("_hits_"): | 378 if missingid.startswith("_hits_"): |
350 # receive progress reports | 379 # receive progress reports |
357 global fetchmisses | 386 global fetchmisses |
358 fetchmisses += len(missed) | 387 fetchmisses += len(missed) |
359 | 388 |
360 fromcache = total - len(missed) | 389 fromcache = total - len(missed) |
361 progress.update(fromcache, total=total) | 390 progress.update(fromcache, total=total) |
362 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n", | 391 self.ui.log( |
363 fromcache, total, hit=fromcache, total=total) | 392 "remotefilelog", |
393 "remote cache hit rate is %r of %r\n", | |
394 fromcache, | |
395 total, | |
396 hit=fromcache, | |
397 total=total, | |
398 ) | |
364 | 399 |
365 oldumask = os.umask(0o002) | 400 oldumask = os.umask(0o002) |
366 try: | 401 try: |
367 # receive cache misses from master | 402 # receive cache misses from master |
368 if missed: | 403 if missed: |
373 self.ui.verbose = False | 408 self.ui.verbose = False |
374 try: | 409 try: |
375 with self._connect() as conn: | 410 with self._connect() as conn: |
376 remote = conn.peer | 411 remote = conn.peer |
377 if remote.capable( | 412 if remote.capable( |
378 constants.NETWORK_CAP_LEGACY_SSH_GETFILES): | 413 constants.NETWORK_CAP_LEGACY_SSH_GETFILES |
414 ): | |
379 if not isinstance(remote, _sshv1peer): | 415 if not isinstance(remote, _sshv1peer): |
380 raise error.Abort('remotefilelog requires ssh ' | 416 raise error.Abort( |
381 'servers') | 417 'remotefilelog requires ssh ' 'servers' |
382 step = self.ui.configint('remotefilelog', | 418 ) |
383 'getfilesstep') | 419 step = self.ui.configint( |
384 getfilestype = self.ui.config('remotefilelog', | 420 'remotefilelog', 'getfilesstep' |
385 'getfilestype') | 421 ) |
422 getfilestype = self.ui.config( | |
423 'remotefilelog', 'getfilestype' | |
424 ) | |
386 if getfilestype == 'threaded': | 425 if getfilestype == 'threaded': |
387 _getfiles = _getfiles_threaded | 426 _getfiles = _getfiles_threaded |
388 else: | 427 else: |
389 _getfiles = _getfiles_optimistic | 428 _getfiles = _getfiles_optimistic |
390 _getfiles(remote, self.receivemissing, | 429 _getfiles( |
391 progress.increment, missed, idmap, step) | 430 remote, |
431 self.receivemissing, | |
432 progress.increment, | |
433 missed, | |
434 idmap, | |
435 step, | |
436 ) | |
392 elif remote.capable("x_rfl_getfile"): | 437 elif remote.capable("x_rfl_getfile"): |
393 if remote.capable('batch'): | 438 if remote.capable('batch'): |
394 batchdefault = 100 | 439 batchdefault = 100 |
395 else: | 440 else: |
396 batchdefault = 10 | 441 batchdefault = 10 |
397 batchsize = self.ui.configint( | 442 batchsize = self.ui.configint( |
398 'remotefilelog', 'batchsize', batchdefault) | 443 'remotefilelog', 'batchsize', batchdefault |
444 ) | |
399 self.ui.debug( | 445 self.ui.debug( |
400 b'requesting %d files from ' | 446 b'requesting %d files from ' |
401 b'remotefilelog server...\n' % len(missed)) | 447 b'remotefilelog server...\n' % len(missed) |
448 ) | |
402 _getfilesbatch( | 449 _getfilesbatch( |
403 remote, self.receivemissing, progress.increment, | 450 remote, |
404 missed, idmap, batchsize) | 451 self.receivemissing, |
452 progress.increment, | |
453 missed, | |
454 idmap, | |
455 batchsize, | |
456 ) | |
405 else: | 457 else: |
406 raise error.Abort("configured remotefilelog server" | 458 raise error.Abort( |
407 " does not support remotefilelog") | 459 "configured remotefilelog server" |
408 | 460 " does not support remotefilelog" |
409 self.ui.log("remotefilefetchlog", | 461 ) |
410 "Success\n", | 462 |
411 fetched_files = progress.pos - fromcache, | 463 self.ui.log( |
412 total_to_fetch = total - fromcache) | 464 "remotefilefetchlog", |
465 "Success\n", | |
466 fetched_files=progress.pos - fromcache, | |
467 total_to_fetch=total - fromcache, | |
468 ) | |
413 except Exception: | 469 except Exception: |
414 self.ui.log("remotefilefetchlog", | 470 self.ui.log( |
415 "Fail\n", | 471 "remotefilefetchlog", |
416 fetched_files = progress.pos - fromcache, | 472 "Fail\n", |
417 total_to_fetch = total - fromcache) | 473 fetched_files=progress.pos - fromcache, |
474 total_to_fetch=total - fromcache, | |
475 ) | |
418 raise | 476 raise |
419 finally: | 477 finally: |
420 self.ui.verbose = verbose | 478 self.ui.verbose = verbose |
421 # send to memcache | 479 # send to memcache |
422 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed)) | 480 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed)) |
430 os.umask(oldumask) | 488 os.umask(oldumask) |
431 | 489 |
432 def receivemissing(self, pipe, filename, node): | 490 def receivemissing(self, pipe, filename, node): |
433 line = pipe.readline()[:-1] | 491 line = pipe.readline()[:-1] |
434 if not line: | 492 if not line: |
435 raise error.ResponseError(_("error downloading file contents:"), | 493 raise error.ResponseError( |
436 _("connection closed early")) | 494 _("error downloading file contents:"), |
495 _("connection closed early"), | |
496 ) | |
437 size = int(line) | 497 size = int(line) |
438 data = pipe.read(size) | 498 data = pipe.read(size) |
439 if len(data) != size: | 499 if len(data) != size: |
440 raise error.ResponseError(_("error downloading file contents:"), | 500 raise error.ResponseError( |
441 _("only received %s of %s bytes") | 501 _("error downloading file contents:"), |
442 % (len(data), size)) | 502 _("only received %s of %s bytes") % (len(data), size), |
443 | 503 ) |
444 self.writedata.addremotefilelognode(filename, bin(node), | 504 |
445 zlib.decompress(data)) | 505 self.writedata.addremotefilelognode( |
506 filename, bin(node), zlib.decompress(data) | |
507 ) | |
446 | 508 |
447 def connect(self): | 509 def connect(self): |
448 if self.cacheprocess: | 510 if self.cacheprocess: |
449 cmd = "%s %s" % (self.cacheprocess, self.writedata._path) | 511 cmd = "%s %s" % (self.cacheprocess, self.writedata._path) |
450 self.remotecache.connect(cmd) | 512 self.remotecache.connect(cmd) |
475 | 537 |
476 self.remotecache = simplecache() | 538 self.remotecache = simplecache() |
477 | 539 |
478 def close(self): | 540 def close(self): |
479 if fetches: | 541 if fetches: |
480 msg = ("%d files fetched over %d fetches - " + | 542 msg = ( |
481 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % ( | 543 "%d files fetched over %d fetches - " |
482 fetched, | 544 + "(%d misses, %0.2f%% hit ratio) over %0.2fs\n" |
483 fetches, | 545 ) % ( |
484 fetchmisses, | 546 fetched, |
485 float(fetched - fetchmisses) / float(fetched) * 100.0, | 547 fetches, |
486 fetchcost) | 548 fetchmisses, |
549 float(fetched - fetchmisses) / float(fetched) * 100.0, | |
550 fetchcost, | |
551 ) | |
487 if self.debugoutput: | 552 if self.debugoutput: |
488 self.ui.warn(msg) | 553 self.ui.warn(msg) |
489 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"), | 554 self.ui.log( |
555 "remotefilelog.prefetch", | |
556 msg.replace("%", "%%"), | |
490 remotefilelogfetched=fetched, | 557 remotefilelogfetched=fetched, |
491 remotefilelogfetches=fetches, | 558 remotefilelogfetches=fetches, |
492 remotefilelogfetchmisses=fetchmisses, | 559 remotefilelogfetchmisses=fetchmisses, |
493 remotefilelogfetchtime=fetchcost * 1000) | 560 remotefilelogfetchtime=fetchcost * 1000, |
561 ) | |
494 | 562 |
495 if self.remotecache.connected: | 563 if self.remotecache.connected: |
496 self.remotecache.close() | 564 self.remotecache.close() |
497 | 565 |
498 def prefetch(self, fileids, force=False, fetchdata=True, | 566 def prefetch( |
499 fetchhistory=False): | 567 self, fileids, force=False, fetchdata=True, fetchhistory=False |
568 ): | |
500 """downloads the given file versions to the cache | 569 """downloads the given file versions to the cache |
501 """ | 570 """ |
502 repo = self.repo | 571 repo = self.repo |
503 idstocheck = [] | 572 idstocheck = [] |
504 for file, id in fileids: | 573 for file, id in fileids: |
505 # hack | 574 # hack |
506 # - we don't use .hgtags | 575 # - we don't use .hgtags |
507 # - workingctx produces ids with length 42, | 576 # - workingctx produces ids with length 42, |
508 # which we skip since they aren't in any cache | 577 # which we skip since they aren't in any cache |
509 if (file == '.hgtags' or len(id) == 42 | 578 if ( |
510 or not repo.shallowmatch(file)): | 579 file == '.hgtags' |
580 or len(id) == 42 | |
581 or not repo.shallowmatch(file) | |
582 ): | |
511 continue | 583 continue |
512 | 584 |
513 idstocheck.append((file, bin(id))) | 585 idstocheck.append((file, bin(id))) |
514 | 586 |
515 datastore = self.datastore | 587 datastore = self.datastore |
516 historystore = self.historystore | 588 historystore = self.historystore |
517 if force: | 589 if force: |
518 datastore = contentstore.unioncontentstore(*repo.shareddatastores) | 590 datastore = contentstore.unioncontentstore(*repo.shareddatastores) |
519 historystore = metadatastore.unionmetadatastore( | 591 historystore = metadatastore.unionmetadatastore( |
520 *repo.sharedhistorystores) | 592 *repo.sharedhistorystores |
593 ) | |
521 | 594 |
522 missingids = set() | 595 missingids = set() |
523 if fetchdata: | 596 if fetchdata: |
524 missingids.update(datastore.getmissing(idstocheck)) | 597 missingids.update(datastore.getmissing(idstocheck)) |
525 if fetchhistory: | 598 if fetchhistory: |
529 # warn about this filtering potentially shadowing bugs. | 602 # warn about this filtering potentially shadowing bugs. |
530 nullids = len([None for unused, id in missingids if id == nullid]) | 603 nullids = len([None for unused, id in missingids if id == nullid]) |
531 if nullids: | 604 if nullids: |
532 missingids = [(f, id) for f, id in missingids if id != nullid] | 605 missingids = [(f, id) for f, id in missingids if id != nullid] |
533 repo.ui.develwarn( | 606 repo.ui.develwarn( |
534 ('remotefilelog not fetching %d null revs' | 607 ( |
535 ' - this is likely hiding bugs' % nullids), | 608 'remotefilelog not fetching %d null revs' |
536 config='remotefilelog-ext') | 609 ' - this is likely hiding bugs' % nullids |
610 ), | |
611 config='remotefilelog-ext', | |
612 ) | |
537 if missingids: | 613 if missingids: |
538 global fetches, fetched, fetchcost | 614 global fetches, fetched, fetchcost |
539 fetches += 1 | 615 fetches += 1 |
540 | 616 |
541 # We want to be able to detect excess individual file downloads, so | 617 # We want to be able to detect excess individual file downloads, so |
542 # let's log that information for debugging. | 618 # let's log that information for debugging. |
543 if fetches >= 15 and fetches < 18: | 619 if fetches >= 15 and fetches < 18: |
544 if fetches == 15: | 620 if fetches == 15: |
545 fetchwarning = self.ui.config('remotefilelog', | 621 fetchwarning = self.ui.config( |
546 'fetchwarning') | 622 'remotefilelog', 'fetchwarning' |
623 ) | |
547 if fetchwarning: | 624 if fetchwarning: |
548 self.ui.warn(fetchwarning + '\n') | 625 self.ui.warn(fetchwarning + '\n') |
549 self.logstacktrace() | 626 self.logstacktrace() |
550 missingids = [(file, hex(id)) for file, id in sorted(missingids)] | 627 missingids = [(file, hex(id)) for file, id in sorted(missingids)] |
551 fetched += len(missingids) | 628 fetched += len(missingids) |
552 start = time.time() | 629 start = time.time() |
553 missingids = self.request(missingids) | 630 missingids = self.request(missingids) |
554 if missingids: | 631 if missingids: |
555 raise error.Abort(_("unable to download %d files") % | 632 raise error.Abort( |
556 len(missingids)) | 633 _("unable to download %d files") % len(missingids) |
634 ) | |
557 fetchcost += time.time() - start | 635 fetchcost += time.time() - start |
558 self._lfsprefetch(fileids) | 636 self._lfsprefetch(fileids) |
559 | 637 |
560 def _lfsprefetch(self, fileids): | 638 def _lfsprefetch(self, fileids): |
561 if not _lfsmod or not util.safehasattr( | 639 if not _lfsmod or not util.safehasattr( |
562 self.repo.svfs, 'lfslocalblobstore'): | 640 self.repo.svfs, 'lfslocalblobstore' |
641 ): | |
563 return | 642 return |
564 if not _lfsmod.wrapper.candownload(self.repo): | 643 if not _lfsmod.wrapper.candownload(self.repo): |
565 return | 644 return |
566 pointers = [] | 645 pointers = [] |
567 store = self.repo.svfs.lfslocalblobstore | 646 store = self.repo.svfs.lfslocalblobstore |
578 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store) | 657 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store) |
579 assert all(store.has(p.oid()) for p in pointers) | 658 assert all(store.has(p.oid()) for p in pointers) |
580 | 659 |
581 def logstacktrace(self): | 660 def logstacktrace(self): |
582 import traceback | 661 import traceback |
583 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n', | 662 |
584 ''.join(traceback.format_stack())) | 663 self.ui.log( |
664 'remotefilelog', | |
665 'excess remotefilelog fetching:\n%s\n', | |
666 ''.join(traceback.format_stack()), | |
667 ) |