41 fetched = 0 |
41 fetched = 0 |
42 fetchmisses = 0 |
42 fetchmisses = 0 |
43 |
43 |
44 _lfsmod = None |
44 _lfsmod = None |
45 |
45 |
|
46 |
46 def getcachekey(reponame, file, id): |
47 def getcachekey(reponame, file, id): |
47 pathhash = node.hex(hashlib.sha1(file).digest()) |
48 pathhash = node.hex(hashlib.sha1(file).digest()) |
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id) |
49 return os.path.join(reponame, pathhash[:2], pathhash[2:], id) |
49 |
50 |
|
51 |
50 def getlocalkey(file, id): |
52 def getlocalkey(file, id): |
51 pathhash = node.hex(hashlib.sha1(file).digest()) |
53 pathhash = node.hex(hashlib.sha1(file).digest()) |
52 return os.path.join(pathhash, id) |
54 return os.path.join(pathhash, id) |
53 |
55 |
|
56 |
54 def peersetup(ui, peer): |
57 def peersetup(ui, peer): |
55 |
|
56 class remotefilepeer(peer.__class__): |
58 class remotefilepeer(peer.__class__): |
57 @wireprotov1peer.batchable |
59 @wireprotov1peer.batchable |
58 def x_rfl_getfile(self, file, node): |
60 def x_rfl_getfile(self, file, node): |
59 if not self.capable('x_rfl_getfile'): |
61 if not self.capable('x_rfl_getfile'): |
60 raise error.Abort( |
62 raise error.Abort( |
61 'configured remotefile server does not support getfile') |
63 'configured remotefile server does not support getfile' |
|
64 ) |
62 f = wireprotov1peer.future() |
65 f = wireprotov1peer.future() |
63 yield {'file': file, 'node': node}, f |
66 yield {'file': file, 'node': node}, f |
64 code, data = f.value.split('\0', 1) |
67 code, data = f.value.split('\0', 1) |
65 if int(code): |
68 if int(code): |
66 raise error.LookupError(file, node, data) |
69 raise error.LookupError(file, node, data) |
67 yield data |
70 yield data |
68 |
71 |
69 @wireprotov1peer.batchable |
72 @wireprotov1peer.batchable |
70 def x_rfl_getflogheads(self, path): |
73 def x_rfl_getflogheads(self, path): |
71 if not self.capable('x_rfl_getflogheads'): |
74 if not self.capable('x_rfl_getflogheads'): |
72 raise error.Abort('configured remotefile server does not ' |
75 raise error.Abort( |
73 'support getflogheads') |
76 'configured remotefile server does not ' |
|
77 'support getflogheads' |
|
78 ) |
74 f = wireprotov1peer.future() |
79 f = wireprotov1peer.future() |
75 yield {'path': path}, f |
80 yield {'path': path}, f |
76 heads = f.value.split('\n') if f.value else [] |
81 heads = f.value.split('\n') if f.value else [] |
77 yield heads |
82 yield heads |
78 |
83 |
79 def _updatecallstreamopts(self, command, opts): |
84 def _updatecallstreamopts(self, command, opts): |
80 if command != 'getbundle': |
85 if command != 'getbundle': |
81 return |
86 return |
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES |
87 if ( |
83 not in self.capabilities()): |
88 constants.NETWORK_CAP_LEGACY_SSH_GETFILES |
|
89 not in self.capabilities() |
|
90 ): |
84 return |
91 return |
85 if not util.safehasattr(self, '_localrepo'): |
92 if not util.safehasattr(self, '_localrepo'): |
86 return |
93 return |
87 if (constants.SHALLOWREPO_REQUIREMENT |
94 if ( |
88 not in self._localrepo.requirements): |
95 constants.SHALLOWREPO_REQUIREMENT |
|
96 not in self._localrepo.requirements |
|
97 ): |
89 return |
98 return |
90 |
99 |
91 bundlecaps = opts.get('bundlecaps') |
100 bundlecaps = opts.get('bundlecaps') |
92 if bundlecaps: |
101 if bundlecaps: |
93 bundlecaps = [bundlecaps] |
102 bundlecaps = [bundlecaps] |
112 bundlecaps.append(excludecap) |
121 bundlecaps.append(excludecap) |
113 opts['bundlecaps'] = ','.join(bundlecaps) |
122 opts['bundlecaps'] = ','.join(bundlecaps) |
114 |
123 |
115 def _sendrequest(self, command, args, **opts): |
124 def _sendrequest(self, command, args, **opts): |
116 self._updatecallstreamopts(command, args) |
125 self._updatecallstreamopts(command, args) |
117 return super(remotefilepeer, self)._sendrequest(command, args, |
126 return super(remotefilepeer, self)._sendrequest( |
118 **opts) |
127 command, args, **opts |
|
128 ) |
119 |
129 |
120 def _callstream(self, command, **opts): |
130 def _callstream(self, command, **opts): |
121 supertype = super(remotefilepeer, self) |
131 supertype = super(remotefilepeer, self) |
122 if not util.safehasattr(supertype, '_sendrequest'): |
132 if not util.safehasattr(supertype, '_sendrequest'): |
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts)) |
133 self._updatecallstreamopts(command, pycompat.byteskwargs(opts)) |
124 return super(remotefilepeer, self)._callstream(command, **opts) |
134 return super(remotefilepeer, self)._callstream(command, **opts) |
125 |
135 |
126 peer.__class__ = remotefilepeer |
136 peer.__class__ = remotefilepeer |
127 |
137 |
|
138 |
128 class cacheconnection(object): |
139 class cacheconnection(object): |
129 """The connection for communicating with the remote cache. Performs |
140 """The connection for communicating with the remote cache. Performs |
130 gets and sets by communicating with an external process that has the |
141 gets and sets by communicating with an external process that has the |
131 cache-specific implementation. |
142 cache-specific implementation. |
132 """ |
143 """ |
|
144 |
133 def __init__(self): |
145 def __init__(self): |
134 self.pipeo = self.pipei = self.pipee = None |
146 self.pipeo = self.pipei = self.pipee = None |
135 self.subprocess = None |
147 self.subprocess = None |
136 self.connected = False |
148 self.connected = False |
137 |
149 |
138 def connect(self, cachecommand): |
150 def connect(self, cachecommand): |
139 if self.pipeo: |
151 if self.pipeo: |
140 raise error.Abort(_("cache connection already open")) |
152 raise error.Abort(_("cache connection already open")) |
141 self.pipei, self.pipeo, self.pipee, self.subprocess = ( |
153 self.pipei, self.pipeo, self.pipee, self.subprocess = procutil.popen4( |
142 procutil.popen4(cachecommand)) |
154 cachecommand |
|
155 ) |
143 self.connected = True |
156 self.connected = True |
144 |
157 |
145 def close(self): |
158 def close(self): |
146 def tryclose(pipe): |
159 def tryclose(pipe): |
147 try: |
160 try: |
148 pipe.close() |
161 pipe.close() |
149 except Exception: |
162 except Exception: |
150 pass |
163 pass |
|
164 |
151 if self.connected: |
165 if self.connected: |
152 try: |
166 try: |
153 self.pipei.write("exit\n") |
167 self.pipei.write("exit\n") |
154 except Exception: |
168 except Exception: |
155 pass |
169 pass |
188 except IOError: |
202 except IOError: |
189 self.close() |
203 self.close() |
190 |
204 |
191 return result |
205 return result |
192 |
206 |
|
207 |
193 def _getfilesbatch( |
208 def _getfilesbatch( |
194 remote, receivemissing, progresstick, missed, idmap, batchsize): |
209 remote, receivemissing, progresstick, missed, idmap, batchsize |
|
210 ): |
195 # Over http(s), iterbatch is a streamy method and we can start |
211 # Over http(s), iterbatch is a streamy method and we can start |
196 # looking at results early. This means we send one (potentially |
212 # looking at results early. This means we send one (potentially |
197 # large) request, but then we show nice progress as we process |
213 # large) request, but then we show nice progress as we process |
198 # file results, rather than showing chunks of $batchsize in |
214 # file results, rather than showing chunks of $batchsize in |
199 # progress. |
215 # progress. |
203 # should probably introduce a streambatch() method upstream and |
219 # should probably introduce a streambatch() method upstream and |
204 # use that for this. |
220 # use that for this. |
205 with remote.commandexecutor() as e: |
221 with remote.commandexecutor() as e: |
206 futures = [] |
222 futures = [] |
207 for m in missed: |
223 for m in missed: |
208 futures.append(e.callcommand('x_rfl_getfile', { |
224 futures.append( |
209 'file': idmap[m], |
225 e.callcommand( |
210 'node': m[-40:] |
226 'x_rfl_getfile', {'file': idmap[m], 'node': m[-40:]} |
211 })) |
227 ) |
|
228 ) |
212 |
229 |
213 for i, m in enumerate(missed): |
230 for i, m in enumerate(missed): |
214 r = futures[i].result() |
231 r = futures[i].result() |
215 futures[i] = None # release memory |
232 futures[i] = None # release memory |
216 file_ = idmap[m] |
233 file_ = idmap[m] |
217 node = m[-40:] |
234 node = m[-40:] |
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node) |
235 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node) |
219 progresstick() |
236 progresstick() |
220 |
237 |
|
238 |
221 def _getfiles_optimistic( |
239 def _getfiles_optimistic( |
222 remote, receivemissing, progresstick, missed, idmap, step): |
240 remote, receivemissing, progresstick, missed, idmap, step |
|
241 ): |
223 remote._callstream("x_rfl_getfiles") |
242 remote._callstream("x_rfl_getfiles") |
224 i = 0 |
243 i = 0 |
225 pipeo = remote._pipeo |
244 pipeo = remote._pipeo |
226 pipei = remote._pipei |
245 pipei = remote._pipei |
227 while i < len(missed): |
246 while i < len(missed): |
373 self.ui.verbose = False |
408 self.ui.verbose = False |
374 try: |
409 try: |
375 with self._connect() as conn: |
410 with self._connect() as conn: |
376 remote = conn.peer |
411 remote = conn.peer |
377 if remote.capable( |
412 if remote.capable( |
378 constants.NETWORK_CAP_LEGACY_SSH_GETFILES): |
413 constants.NETWORK_CAP_LEGACY_SSH_GETFILES |
|
414 ): |
379 if not isinstance(remote, _sshv1peer): |
415 if not isinstance(remote, _sshv1peer): |
380 raise error.Abort('remotefilelog requires ssh ' |
416 raise error.Abort( |
381 'servers') |
417 'remotefilelog requires ssh ' 'servers' |
382 step = self.ui.configint('remotefilelog', |
418 ) |
383 'getfilesstep') |
419 step = self.ui.configint( |
384 getfilestype = self.ui.config('remotefilelog', |
420 'remotefilelog', 'getfilesstep' |
385 'getfilestype') |
421 ) |
|
422 getfilestype = self.ui.config( |
|
423 'remotefilelog', 'getfilestype' |
|
424 ) |
386 if getfilestype == 'threaded': |
425 if getfilestype == 'threaded': |
387 _getfiles = _getfiles_threaded |
426 _getfiles = _getfiles_threaded |
388 else: |
427 else: |
389 _getfiles = _getfiles_optimistic |
428 _getfiles = _getfiles_optimistic |
390 _getfiles(remote, self.receivemissing, |
429 _getfiles( |
391 progress.increment, missed, idmap, step) |
430 remote, |
|
431 self.receivemissing, |
|
432 progress.increment, |
|
433 missed, |
|
434 idmap, |
|
435 step, |
|
436 ) |
392 elif remote.capable("x_rfl_getfile"): |
437 elif remote.capable("x_rfl_getfile"): |
393 if remote.capable('batch'): |
438 if remote.capable('batch'): |
394 batchdefault = 100 |
439 batchdefault = 100 |
395 else: |
440 else: |
396 batchdefault = 10 |
441 batchdefault = 10 |
397 batchsize = self.ui.configint( |
442 batchsize = self.ui.configint( |
398 'remotefilelog', 'batchsize', batchdefault) |
443 'remotefilelog', 'batchsize', batchdefault |
|
444 ) |
399 self.ui.debug( |
445 self.ui.debug( |
400 b'requesting %d files from ' |
446 b'requesting %d files from ' |
401 b'remotefilelog server...\n' % len(missed)) |
447 b'remotefilelog server...\n' % len(missed) |
|
448 ) |
402 _getfilesbatch( |
449 _getfilesbatch( |
403 remote, self.receivemissing, progress.increment, |
450 remote, |
404 missed, idmap, batchsize) |
451 self.receivemissing, |
|
452 progress.increment, |
|
453 missed, |
|
454 idmap, |
|
455 batchsize, |
|
456 ) |
405 else: |
457 else: |
406 raise error.Abort("configured remotefilelog server" |
458 raise error.Abort( |
407 " does not support remotefilelog") |
459 "configured remotefilelog server" |
408 |
460 " does not support remotefilelog" |
409 self.ui.log("remotefilefetchlog", |
461 ) |
410 "Success\n", |
462 |
411 fetched_files = progress.pos - fromcache, |
463 self.ui.log( |
412 total_to_fetch = total - fromcache) |
464 "remotefilefetchlog", |
|
465 "Success\n", |
|
466 fetched_files=progress.pos - fromcache, |
|
467 total_to_fetch=total - fromcache, |
|
468 ) |
413 except Exception: |
469 except Exception: |
414 self.ui.log("remotefilefetchlog", |
470 self.ui.log( |
415 "Fail\n", |
471 "remotefilefetchlog", |
416 fetched_files = progress.pos - fromcache, |
472 "Fail\n", |
417 total_to_fetch = total - fromcache) |
473 fetched_files=progress.pos - fromcache, |
|
474 total_to_fetch=total - fromcache, |
|
475 ) |
418 raise |
476 raise |
419 finally: |
477 finally: |
420 self.ui.verbose = verbose |
478 self.ui.verbose = verbose |
421 # send to memcache |
479 # send to memcache |
422 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed)) |
480 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed)) |
430 os.umask(oldumask) |
488 os.umask(oldumask) |
431 |
489 |
432 def receivemissing(self, pipe, filename, node): |
490 def receivemissing(self, pipe, filename, node): |
433 line = pipe.readline()[:-1] |
491 line = pipe.readline()[:-1] |
434 if not line: |
492 if not line: |
435 raise error.ResponseError(_("error downloading file contents:"), |
493 raise error.ResponseError( |
436 _("connection closed early")) |
494 _("error downloading file contents:"), |
|
495 _("connection closed early"), |
|
496 ) |
437 size = int(line) |
497 size = int(line) |
438 data = pipe.read(size) |
498 data = pipe.read(size) |
439 if len(data) != size: |
499 if len(data) != size: |
440 raise error.ResponseError(_("error downloading file contents:"), |
500 raise error.ResponseError( |
441 _("only received %s of %s bytes") |
501 _("error downloading file contents:"), |
442 % (len(data), size)) |
502 _("only received %s of %s bytes") % (len(data), size), |
443 |
503 ) |
444 self.writedata.addremotefilelognode(filename, bin(node), |
504 |
445 zlib.decompress(data)) |
505 self.writedata.addremotefilelognode( |
|
506 filename, bin(node), zlib.decompress(data) |
|
507 ) |
446 |
508 |
447 def connect(self): |
509 def connect(self): |
448 if self.cacheprocess: |
510 if self.cacheprocess: |
449 cmd = "%s %s" % (self.cacheprocess, self.writedata._path) |
511 cmd = "%s %s" % (self.cacheprocess, self.writedata._path) |
450 self.remotecache.connect(cmd) |
512 self.remotecache.connect(cmd) |
475 |
537 |
476 self.remotecache = simplecache() |
538 self.remotecache = simplecache() |
477 |
539 |
478 def close(self): |
540 def close(self): |
479 if fetches: |
541 if fetches: |
480 msg = ("%d files fetched over %d fetches - " + |
542 msg = ( |
481 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % ( |
543 "%d files fetched over %d fetches - " |
482 fetched, |
544 + "(%d misses, %0.2f%% hit ratio) over %0.2fs\n" |
483 fetches, |
545 ) % ( |
484 fetchmisses, |
546 fetched, |
485 float(fetched - fetchmisses) / float(fetched) * 100.0, |
547 fetches, |
486 fetchcost) |
548 fetchmisses, |
|
549 float(fetched - fetchmisses) / float(fetched) * 100.0, |
|
550 fetchcost, |
|
551 ) |
487 if self.debugoutput: |
552 if self.debugoutput: |
488 self.ui.warn(msg) |
553 self.ui.warn(msg) |
489 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"), |
554 self.ui.log( |
|
555 "remotefilelog.prefetch", |
|
556 msg.replace("%", "%%"), |
490 remotefilelogfetched=fetched, |
557 remotefilelogfetched=fetched, |
491 remotefilelogfetches=fetches, |
558 remotefilelogfetches=fetches, |
492 remotefilelogfetchmisses=fetchmisses, |
559 remotefilelogfetchmisses=fetchmisses, |
493 remotefilelogfetchtime=fetchcost * 1000) |
560 remotefilelogfetchtime=fetchcost * 1000, |
|
561 ) |
494 |
562 |
495 if self.remotecache.connected: |
563 if self.remotecache.connected: |
496 self.remotecache.close() |
564 self.remotecache.close() |
497 |
565 |
498 def prefetch(self, fileids, force=False, fetchdata=True, |
566 def prefetch( |
499 fetchhistory=False): |
567 self, fileids, force=False, fetchdata=True, fetchhistory=False |
|
568 ): |
500 """downloads the given file versions to the cache |
569 """downloads the given file versions to the cache |
501 """ |
570 """ |
502 repo = self.repo |
571 repo = self.repo |
503 idstocheck = [] |
572 idstocheck = [] |
504 for file, id in fileids: |
573 for file, id in fileids: |
505 # hack |
574 # hack |
506 # - we don't use .hgtags |
575 # - we don't use .hgtags |
507 # - workingctx produces ids with length 42, |
576 # - workingctx produces ids with length 42, |
508 # which we skip since they aren't in any cache |
577 # which we skip since they aren't in any cache |
509 if (file == '.hgtags' or len(id) == 42 |
578 if ( |
510 or not repo.shallowmatch(file)): |
579 file == '.hgtags' |
|
580 or len(id) == 42 |
|
581 or not repo.shallowmatch(file) |
|
582 ): |
511 continue |
583 continue |
512 |
584 |
513 idstocheck.append((file, bin(id))) |
585 idstocheck.append((file, bin(id))) |
514 |
586 |
515 datastore = self.datastore |
587 datastore = self.datastore |
516 historystore = self.historystore |
588 historystore = self.historystore |
517 if force: |
589 if force: |
518 datastore = contentstore.unioncontentstore(*repo.shareddatastores) |
590 datastore = contentstore.unioncontentstore(*repo.shareddatastores) |
519 historystore = metadatastore.unionmetadatastore( |
591 historystore = metadatastore.unionmetadatastore( |
520 *repo.sharedhistorystores) |
592 *repo.sharedhistorystores |
|
593 ) |
521 |
594 |
522 missingids = set() |
595 missingids = set() |
523 if fetchdata: |
596 if fetchdata: |
524 missingids.update(datastore.getmissing(idstocheck)) |
597 missingids.update(datastore.getmissing(idstocheck)) |
525 if fetchhistory: |
598 if fetchhistory: |
529 # warn about this filtering potentially shadowing bugs. |
602 # warn about this filtering potentially shadowing bugs. |
530 nullids = len([None for unused, id in missingids if id == nullid]) |
603 nullids = len([None for unused, id in missingids if id == nullid]) |
531 if nullids: |
604 if nullids: |
532 missingids = [(f, id) for f, id in missingids if id != nullid] |
605 missingids = [(f, id) for f, id in missingids if id != nullid] |
533 repo.ui.develwarn( |
606 repo.ui.develwarn( |
534 ('remotefilelog not fetching %d null revs' |
607 ( |
535 ' - this is likely hiding bugs' % nullids), |
608 'remotefilelog not fetching %d null revs' |
536 config='remotefilelog-ext') |
609 ' - this is likely hiding bugs' % nullids |
|
610 ), |
|
611 config='remotefilelog-ext', |
|
612 ) |
537 if missingids: |
613 if missingids: |
538 global fetches, fetched, fetchcost |
614 global fetches, fetched, fetchcost |
539 fetches += 1 |
615 fetches += 1 |
540 |
616 |
541 # We want to be able to detect excess individual file downloads, so |
617 # We want to be able to detect excess individual file downloads, so |
542 # let's log that information for debugging. |
618 # let's log that information for debugging. |
543 if fetches >= 15 and fetches < 18: |
619 if fetches >= 15 and fetches < 18: |
544 if fetches == 15: |
620 if fetches == 15: |
545 fetchwarning = self.ui.config('remotefilelog', |
621 fetchwarning = self.ui.config( |
546 'fetchwarning') |
622 'remotefilelog', 'fetchwarning' |
|
623 ) |
547 if fetchwarning: |
624 if fetchwarning: |
548 self.ui.warn(fetchwarning + '\n') |
625 self.ui.warn(fetchwarning + '\n') |
549 self.logstacktrace() |
626 self.logstacktrace() |
550 missingids = [(file, hex(id)) for file, id in sorted(missingids)] |
627 missingids = [(file, hex(id)) for file, id in sorted(missingids)] |
551 fetched += len(missingids) |
628 fetched += len(missingids) |
552 start = time.time() |
629 start = time.time() |
553 missingids = self.request(missingids) |
630 missingids = self.request(missingids) |
554 if missingids: |
631 if missingids: |
555 raise error.Abort(_("unable to download %d files") % |
632 raise error.Abort( |
556 len(missingids)) |
633 _("unable to download %d files") % len(missingids) |
|
634 ) |
557 fetchcost += time.time() - start |
635 fetchcost += time.time() - start |
558 self._lfsprefetch(fileids) |
636 self._lfsprefetch(fileids) |
559 |
637 |
560 def _lfsprefetch(self, fileids): |
638 def _lfsprefetch(self, fileids): |
561 if not _lfsmod or not util.safehasattr( |
639 if not _lfsmod or not util.safehasattr( |
562 self.repo.svfs, 'lfslocalblobstore'): |
640 self.repo.svfs, 'lfslocalblobstore' |
|
641 ): |
563 return |
642 return |
564 if not _lfsmod.wrapper.candownload(self.repo): |
643 if not _lfsmod.wrapper.candownload(self.repo): |
565 return |
644 return |
566 pointers = [] |
645 pointers = [] |
567 store = self.repo.svfs.lfslocalblobstore |
646 store = self.repo.svfs.lfslocalblobstore |