433 |
435 |
434 # type of file to stream |
436 # type of file to stream |
435 _fileappend = 0 # append only file |
437 _fileappend = 0 # append only file |
436 _filefull = 1 # full snapshot file |
438 _filefull = 1 # full snapshot file |
437 |
439 |
|
440 # Source of the file |
|
441 _srcstore = 's' # store (svfs) |
|
442 _srccache = 'c' # cache (cache) |
|
443 |
438 # This is it's own function so extensions can override it. |
444 # This is it's own function so extensions can override it. |
439 def _walkstreamfullstorefiles(repo): |
445 def _walkstreamfullstorefiles(repo): |
440 """list snapshot file from the store""" |
446 """list snapshot file from the store""" |
441 fnames = [] |
447 fnames = [] |
442 if not repo.publishing(): |
448 if not repo.publishing(): |
443 fnames.append('phaseroots') |
449 fnames.append('phaseroots') |
444 return fnames |
450 return fnames |
445 |
451 |
446 def _filterfull(entry, copy, vfs): |
452 def _filterfull(entry, copy, vfsmap): |
447 """actually copy the snapshot files""" |
453 """actually copy the snapshot files""" |
448 name, ftype, data = entry |
454 src, name, ftype, data = entry |
449 if ftype != _filefull: |
455 if ftype != _filefull: |
450 return entry |
456 return entry |
451 return (name, ftype, copy(vfs.join(name))) |
457 return (src, name, ftype, copy(vfsmap[src].join(name))) |
452 |
458 |
453 @contextlib.contextmanager |
459 @contextlib.contextmanager |
454 def maketempcopies(): |
460 def maketempcopies(): |
455 """return a function to temporary copy file""" |
461 """return a function to temporary copy file""" |
456 files = [] |
462 files = [] |
464 yield copy |
470 yield copy |
465 finally: |
471 finally: |
466 for tmp in files: |
472 for tmp in files: |
467 util.tryunlink(tmp) |
473 util.tryunlink(tmp) |
468 |
474 |
|
475 def _makemap(repo): |
|
476 """make a (src -> vfs) map for the repo""" |
|
477 vfsmap = { |
|
478 _srcstore: repo.svfs, |
|
479 _srccache: repo.cachevfs, |
|
480 } |
|
481 # we keep repo.vfs out of the on purpose, ther are too many danger there |
|
482 # (eg: .hg/hgrc) |
|
483 assert repo.vfs not in vfsmap.values() |
|
484 |
|
485 return vfsmap |
|
486 |
469 def _emit(repo, entries, totalfilesize): |
487 def _emit(repo, entries, totalfilesize): |
470 """actually emit the stream bundle""" |
488 """actually emit the stream bundle""" |
471 vfs = repo.svfs |
489 vfsmap = _makemap(repo) |
472 progress = repo.ui.progress |
490 progress = repo.ui.progress |
473 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) |
491 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) |
474 with maketempcopies() as copy: |
492 with maketempcopies() as copy: |
475 try: |
493 try: |
476 # copy is delayed until we are in the try |
494 # copy is delayed until we are in the try |
477 entries = [_filterfull(e, copy, vfs) for e in entries] |
495 entries = [_filterfull(e, copy, vfsmap) for e in entries] |
478 yield None # this release the lock on the repository |
496 yield None # this release the lock on the repository |
479 seen = 0 |
497 seen = 0 |
480 |
498 |
481 for name, ftype, data in entries: |
499 for src, name, ftype, data in entries: |
|
500 vfs = vfsmap[src] |
|
501 yield src |
482 yield util.uvarintencode(len(name)) |
502 yield util.uvarintencode(len(name)) |
483 if ftype == _fileappend: |
503 if ftype == _fileappend: |
484 fp = vfs(name) |
504 fp = vfs(name) |
485 size = data |
505 size = data |
486 elif ftype == _filefull: |
506 elif ftype == _filefull: |
505 |
525 |
506 def generatev2(repo): |
526 def generatev2(repo): |
507 """Emit content for version 2 of a streaming clone. |
527 """Emit content for version 2 of a streaming clone. |
508 |
528 |
509 the data stream consists the following entries: |
529 the data stream consists the following entries: |
510 1) A varint containing the length of the filename |
530 1) A char representing the file destination (eg: store or cache) |
511 2) A varint containing the length of file data |
531 2) A varint containing the length of the filename |
512 3) N bytes containing the filename (the internal, store-agnostic form) |
532 3) A varint containing the length of file data |
513 4) N bytes containing the file data |
533 4) N bytes containing the filename (the internal, store-agnostic form) |
|
534 5) N bytes containing the file data |
514 |
535 |
515 Returns a 3-tuple of (file count, file size, data iterator). |
536 Returns a 3-tuple of (file count, file size, data iterator). |
516 """ |
537 """ |
517 |
538 |
518 with repo.lock(): |
539 with repo.lock(): |
521 totalfilesize = 0 |
542 totalfilesize = 0 |
522 |
543 |
523 repo.ui.debug('scanning\n') |
544 repo.ui.debug('scanning\n') |
524 for name, ename, size in _walkstreamfiles(repo): |
545 for name, ename, size in _walkstreamfiles(repo): |
525 if size: |
546 if size: |
526 entries.append((name, _fileappend, size)) |
547 entries.append((_srcstore, name, _fileappend, size)) |
527 totalfilesize += size |
548 totalfilesize += size |
528 for name in _walkstreamfullstorefiles(repo): |
549 for name in _walkstreamfullstorefiles(repo): |
529 if repo.svfs.exists(name): |
550 if repo.svfs.exists(name): |
530 totalfilesize += repo.svfs.lstat(name).st_size |
551 totalfilesize += repo.svfs.lstat(name).st_size |
531 entries.append((name, _filefull, None)) |
552 entries.append((_srcstore, name, _filefull, None)) |
|
553 for name in cacheutil.cachetocopy(repo): |
|
554 if repo.cachevfs.exists(name): |
|
555 totalfilesize += repo.cachevfs.lstat(name).st_size |
|
556 entries.append((_srccache, name, _filefull, None)) |
532 |
557 |
533 chunks = _emit(repo, entries, totalfilesize) |
558 chunks = _emit(repo, entries, totalfilesize) |
534 first = next(chunks) |
559 first = next(chunks) |
535 assert first is None |
560 assert first is None |
536 |
561 |
537 return len(entries), totalfilesize, chunks |
562 return len(entries), totalfilesize, chunks |
538 |
563 |
|
564 @contextlib.contextmanager |
|
565 def nested(*ctxs): |
|
566 with warnings.catch_warnings(): |
|
567 # For some reason, Python decided 'nested' was deprecated without |
|
568 # replacement. They officially advertised for filtering the deprecation |
|
569 # warning for people who actually need the feature. |
|
570 warnings.filterwarnings("ignore",category=DeprecationWarning) |
|
571 with contextlib.nested(*ctxs): |
|
572 yield |
|
573 |
539 def consumev2(repo, fp, filecount, filesize): |
574 def consumev2(repo, fp, filecount, filesize): |
540 """Apply the contents from a version 2 streaming clone. |
575 """Apply the contents from a version 2 streaming clone. |
541 |
576 |
542 Data is read from an object that only needs to provide a ``read(size)`` |
577 Data is read from an object that only needs to provide a ``read(size)`` |
543 method. |
578 method. |
550 handledbytes = 0 |
585 handledbytes = 0 |
551 progress = repo.ui.progress |
586 progress = repo.ui.progress |
552 |
587 |
553 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes')) |
588 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes')) |
554 |
589 |
555 vfs = repo.svfs |
590 vfsmap = _makemap(repo) |
556 |
591 |
557 with repo.transaction('clone'): |
592 with repo.transaction('clone'): |
558 with vfs.backgroundclosing(repo.ui): |
593 ctxs = (vfs.backgroundclosing(repo.ui) |
|
594 for vfs in vfsmap.values()) |
|
595 with nested(*ctxs): |
559 for i in range(filecount): |
596 for i in range(filecount): |
|
597 src = fp.read(1) |
|
598 vfs = vfsmap[src] |
560 namelen = util.uvarintdecodestream(fp) |
599 namelen = util.uvarintdecodestream(fp) |
561 datalen = util.uvarintdecodestream(fp) |
600 datalen = util.uvarintdecodestream(fp) |
562 |
601 |
563 name = fp.read(namelen) |
602 name = fp.read(namelen) |
564 |
603 |
565 if repo.ui.debugflag: |
604 if repo.ui.debugflag: |
566 repo.ui.debug('adding %s (%s)\n' % |
605 repo.ui.debug('adding [%s] %s (%s)\n' % |
567 (name, util.bytecount(datalen))) |
606 (src, name, util.bytecount(datalen))) |
568 |
607 |
569 with vfs(name, 'w') as ofp: |
608 with vfs(name, 'w') as ofp: |
570 for chunk in util.filechunkiter(fp, limit=datalen): |
609 for chunk in util.filechunkiter(fp, limit=datalen): |
571 handledbytes += len(chunk) |
610 handledbytes += len(chunk) |
572 progress(_('clone'), handledbytes, total=filesize, |
611 progress(_('clone'), handledbytes, total=filesize, |