Mercurial > hg
comparison mercurial/streamclone.py @ 35756:cfdccd560b66
streamclone: define first iteration of version 2 of stream format
(This patch is based on a first draft from Gregory Szorc, with deeper rework)
Version 1 of the stream clone format was invented many years ago and suffers
from a few deficiencies:
1) Filenames are stored in store-encoded (on filesystem) form rather than in
their internal form. This makes future compatibility with new store
filename encodings more difficult.
2) File entry "headers" consist of a newline of the file name followed by the
string file size. Converting strings to integers is avoidable overhead. We
can't store filenames with newlines (manifests have this limitation as
well, so it isn't a major concern). But the big concern here is the
necessity for readline(). Scanning for newlines means reading ahead and
that means extra buffer allocations and slicing (in Python) and this makes
performance suffer.
3) Filenames aren't compressed optimally. Filenames should be compressed well
since there is a lot of repeated data. However, since they are scattered
all over the stream (with revlog data in between), they typically fall
outside the window size of the compressor and don't compress.
4) It can only exchange stored based content, being able to exchange caches
too would be nice.
5) It is limited to a stream-based protocol and isn't suitable for an on-disk
format for general repository reading because the offset of individual file
entries requires scanning the entire file to find file records.
As part of enabling streaming clones to work in bundle2, #2 proved to have a
significant negative impact on performance. Since bundle2 provides the
opportunity to start fresh, Gregory Szorc figured he would take the
opportunity to invent a new streaming clone data format.
The new format devised in this series addresses #1, #2, and #4. It punts on #3
because it was complex without yielding a significant gain and on #5 because
devising a new store format that "packs" multiple revlogs into a single
"packed revlog" is massive scope bloat. However, this v2 format might be
suitable for streaming into a "packed revlog" with minimal processing. If it
works, great. If not, we can always invent stream format when it is needed.
This patch only introduces the bases of the format. We'll get it usable through
bundle2 first, then we'll extend the format in future patches to bring it to its
full potential (especially #4).
author | Boris Feld <boris.feld@octobus.net> |
---|---|
date | Thu, 18 Jan 2018 00:48:56 +0100 |
parents | ded3a63f305b |
children | bbf7abd09ff0 |
comparison
equal
deleted
inserted
replaced
35755:2384523cee4d | 35756:cfdccd560b66 |
---|---|
426 def __init__(self, fh): | 426 def __init__(self, fh): |
427 self._fh = fh | 427 self._fh = fh |
428 | 428 |
429 def apply(self, repo): | 429 def apply(self, repo): |
430 return applybundlev1(repo, self._fh) | 430 return applybundlev1(repo, self._fh) |
431 | |
432 def _emit(repo, entries, totalfilesize): | |
433 """actually emit the stream bundle""" | |
434 progress = repo.ui.progress | |
435 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) | |
436 vfs = repo.svfs | |
437 try: | |
438 seen = 0 | |
439 for name, size in entries: | |
440 yield util.uvarintencode(len(name)) | |
441 fp = vfs(name) | |
442 try: | |
443 yield util.uvarintencode(size) | |
444 yield name | |
445 if size <= 65536: | |
446 chunks = (fp.read(size),) | |
447 else: | |
448 chunks = util.filechunkiter(fp, limit=size) | |
449 for chunk in chunks: | |
450 seen += len(chunk) | |
451 progress(_('bundle'), seen, total=totalfilesize, | |
452 unit=_('bytes')) | |
453 yield chunk | |
454 finally: | |
455 fp.close() | |
456 finally: | |
457 progress(_('bundle'), None) | |
458 | |
459 def generatev2(repo): | |
460 """Emit content for version 2 of a streaming clone. | |
461 | |
462 the data stream consists the following entries: | |
463 1) A varint containing the length of the filename | |
464 2) A varint containing the length of file data | |
465 3) N bytes containing the filename (the internal, store-agnostic form) | |
466 4) N bytes containing the file data | |
467 | |
468 Returns a 3-tuple of (file count, file size, data iterator). | |
469 """ | |
470 | |
471 with repo.lock(): | |
472 | |
473 entries = [] | |
474 totalfilesize = 0 | |
475 | |
476 repo.ui.debug('scanning\n') | |
477 for name, ename, size in _walkstreamfiles(repo): | |
478 if size: | |
479 entries.append((name, size)) | |
480 totalfilesize += size | |
481 | |
482 chunks = _emit(repo, entries, totalfilesize) | |
483 | |
484 return len(entries), totalfilesize, chunks | |
485 | |
486 def consumev2(repo, fp, filecount, filesize): | |
487 """Apply the contents from a version 2 streaming clone. | |
488 | |
489 Data is read from an object that only needs to provide a ``read(size)`` | |
490 method. | |
491 """ | |
492 with repo.lock(): | |
493 repo.ui.status(_('%d files to transfer, %s of data\n') % | |
494 (filecount, util.bytecount(filesize))) | |
495 | |
496 start = util.timer() | |
497 handledbytes = 0 | |
498 progress = repo.ui.progress | |
499 | |
500 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes')) | |
501 | |
502 vfs = repo.svfs | |
503 | |
504 with repo.transaction('clone'): | |
505 with vfs.backgroundclosing(repo.ui): | |
506 for i in range(filecount): | |
507 namelen = util.uvarintdecodestream(fp) | |
508 datalen = util.uvarintdecodestream(fp) | |
509 | |
510 name = fp.read(namelen) | |
511 | |
512 if repo.ui.debugflag: | |
513 repo.ui.debug('adding %s (%s)\n' % | |
514 (name, util.bytecount(datalen))) | |
515 | |
516 with vfs(name, 'w') as ofp: | |
517 for chunk in util.filechunkiter(fp, limit=datalen): | |
518 handledbytes += len(chunk) | |
519 progress(_('clone'), handledbytes, total=filesize, | |
520 unit=_('bytes')) | |
521 ofp.write(chunk) | |
522 | |
523 # force @filecache properties to be reloaded from | |
524 # streamclone-ed file at next access | |
525 repo.invalidate(clearfilecache=True) | |
526 | |
527 elapsed = util.timer() - start | |
528 if elapsed <= 0: | |
529 elapsed = 0.001 | |
530 progress(_('clone'), None) | |
531 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | |
532 (util.bytecount(handledbytes), elapsed, | |
533 util.bytecount(handledbytes / elapsed))) | |
534 | |
535 def applybundlev2(repo, fp, filecount, filesize, requirements): | |
536 missingreqs = [r for r in requirements if r not in repo.supported] | |
537 if missingreqs: | |
538 raise error.Abort(_('unable to apply stream clone: ' | |
539 'unsupported format: %s') % | |
540 ', '.join(sorted(missingreqs))) | |
541 | |
542 consumev2(repo, fp, filecount, filesize) |