426 def __init__(self, fh): |
426 def __init__(self, fh): |
427 self._fh = fh |
427 self._fh = fh |
428 |
428 |
429 def apply(self, repo): |
429 def apply(self, repo): |
430 return applybundlev1(repo, self._fh) |
430 return applybundlev1(repo, self._fh) |
|
431 |
|
432 def _emit(repo, entries, totalfilesize): |
|
433 """actually emit the stream bundle""" |
|
434 progress = repo.ui.progress |
|
435 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) |
|
436 vfs = repo.svfs |
|
437 try: |
|
438 seen = 0 |
|
439 for name, size in entries: |
|
440 yield util.uvarintencode(len(name)) |
|
441 fp = vfs(name) |
|
442 try: |
|
443 yield util.uvarintencode(size) |
|
444 yield name |
|
445 if size <= 65536: |
|
446 chunks = (fp.read(size),) |
|
447 else: |
|
448 chunks = util.filechunkiter(fp, limit=size) |
|
449 for chunk in chunks: |
|
450 seen += len(chunk) |
|
451 progress(_('bundle'), seen, total=totalfilesize, |
|
452 unit=_('bytes')) |
|
453 yield chunk |
|
454 finally: |
|
455 fp.close() |
|
456 finally: |
|
457 progress(_('bundle'), None) |
|
458 |
|
459 def generatev2(repo): |
|
460 """Emit content for version 2 of a streaming clone. |
|
461 |
|
462 the data stream consists the following entries: |
|
463 1) A varint containing the length of the filename |
|
464 2) A varint containing the length of file data |
|
465 3) N bytes containing the filename (the internal, store-agnostic form) |
|
466 4) N bytes containing the file data |
|
467 |
|
468 Returns a 3-tuple of (file count, file size, data iterator). |
|
469 """ |
|
470 |
|
471 with repo.lock(): |
|
472 |
|
473 entries = [] |
|
474 totalfilesize = 0 |
|
475 |
|
476 repo.ui.debug('scanning\n') |
|
477 for name, ename, size in _walkstreamfiles(repo): |
|
478 if size: |
|
479 entries.append((name, size)) |
|
480 totalfilesize += size |
|
481 |
|
482 chunks = _emit(repo, entries, totalfilesize) |
|
483 |
|
484 return len(entries), totalfilesize, chunks |
|
485 |
|
486 def consumev2(repo, fp, filecount, filesize): |
|
487 """Apply the contents from a version 2 streaming clone. |
|
488 |
|
489 Data is read from an object that only needs to provide a ``read(size)`` |
|
490 method. |
|
491 """ |
|
492 with repo.lock(): |
|
493 repo.ui.status(_('%d files to transfer, %s of data\n') % |
|
494 (filecount, util.bytecount(filesize))) |
|
495 |
|
496 start = util.timer() |
|
497 handledbytes = 0 |
|
498 progress = repo.ui.progress |
|
499 |
|
500 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes')) |
|
501 |
|
502 vfs = repo.svfs |
|
503 |
|
504 with repo.transaction('clone'): |
|
505 with vfs.backgroundclosing(repo.ui): |
|
506 for i in range(filecount): |
|
507 namelen = util.uvarintdecodestream(fp) |
|
508 datalen = util.uvarintdecodestream(fp) |
|
509 |
|
510 name = fp.read(namelen) |
|
511 |
|
512 if repo.ui.debugflag: |
|
513 repo.ui.debug('adding %s (%s)\n' % |
|
514 (name, util.bytecount(datalen))) |
|
515 |
|
516 with vfs(name, 'w') as ofp: |
|
517 for chunk in util.filechunkiter(fp, limit=datalen): |
|
518 handledbytes += len(chunk) |
|
519 progress(_('clone'), handledbytes, total=filesize, |
|
520 unit=_('bytes')) |
|
521 ofp.write(chunk) |
|
522 |
|
523 # force @filecache properties to be reloaded from |
|
524 # streamclone-ed file at next access |
|
525 repo.invalidate(clearfilecache=True) |
|
526 |
|
527 elapsed = util.timer() - start |
|
528 if elapsed <= 0: |
|
529 elapsed = 0.001 |
|
530 progress(_('clone'), None) |
|
531 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % |
|
532 (util.bytecount(handledbytes), elapsed, |
|
533 util.bytecount(handledbytes / elapsed))) |
|
534 |
|
535 def applybundlev2(repo, fp, filecount, filesize, requirements): |
|
536 missingreqs = [r for r in requirements if r not in repo.supported] |
|
537 if missingreqs: |
|
538 raise error.Abort(_('unable to apply stream clone: ' |
|
539 'unsupported format: %s') % |
|
540 ', '.join(sorted(missingreqs))) |
|
541 |
|
542 consumev2(repo, fp, filecount, filesize) |