comparison mercurial/streamclone.py @ 35756:cfdccd560b66

streamclone: define first iteration of version 2 of stream format (This patch is based on a first draft from Gregory Szorc, with deeper rework) Version 1 of the stream clone format was invented many years ago and suffers from a few deficiencies: 1) Filenames are stored in store-encoded (on filesystem) form rather than in their internal form. This makes future compatibility with new store filename encodings more difficult. 2) File entry "headers" consist of a newline of the file name followed by the string file size. Converting strings to integers is avoidable overhead. We can't store filenames with newlines (manifests have this limitation as well, so it isn't a major concern). But the big concern here is the necessity for readline(). Scanning for newlines means reading ahead and that means extra buffer allocations and slicing (in Python) and this makes performance suffer. 3) Filenames aren't compressed optimally. Filenames should be compressed well since there is a lot of repeated data. However, since they are scattered all over the stream (with revlog data in between), they typically fall outside the window size of the compressor and don't compress. 4) It can only exchange stored based content, being able to exchange caches too would be nice. 5) It is limited to a stream-based protocol and isn't suitable for an on-disk format for general repository reading because the offset of individual file entries requires scanning the entire file to find file records. As part of enabling streaming clones to work in bundle2, #2 proved to have a significant negative impact on performance. Since bundle2 provides the opportunity to start fresh, Gregory Szorc figured he would take the opportunity to invent a new streaming clone data format. The new format devised in this series addresses #1, #2, and #4. It punts on #3 because it was complex without yielding a significant gain and on #5 because devising a new store format that "packs" multiple revlogs into a single "packed revlog" is massive scope bloat. However, this v2 format might be suitable for streaming into a "packed revlog" with minimal processing. If it works, great. If not, we can always invent stream format when it is needed. This patch only introduces the bases of the format. We'll get it usable through bundle2 first, then we'll extend the format in future patches to bring it to its full potential (especially #4).
author Boris Feld <boris.feld@octobus.net>
date Thu, 18 Jan 2018 00:48:56 +0100
parents ded3a63f305b
children bbf7abd09ff0
comparison
equal deleted inserted replaced
35755:2384523cee4d 35756:cfdccd560b66
426 def __init__(self, fh): 426 def __init__(self, fh):
427 self._fh = fh 427 self._fh = fh
428 428
429 def apply(self, repo): 429 def apply(self, repo):
430 return applybundlev1(repo, self._fh) 430 return applybundlev1(repo, self._fh)
431
432 def _emit(repo, entries, totalfilesize):
433 """actually emit the stream bundle"""
434 progress = repo.ui.progress
435 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
436 vfs = repo.svfs
437 try:
438 seen = 0
439 for name, size in entries:
440 yield util.uvarintencode(len(name))
441 fp = vfs(name)
442 try:
443 yield util.uvarintencode(size)
444 yield name
445 if size <= 65536:
446 chunks = (fp.read(size),)
447 else:
448 chunks = util.filechunkiter(fp, limit=size)
449 for chunk in chunks:
450 seen += len(chunk)
451 progress(_('bundle'), seen, total=totalfilesize,
452 unit=_('bytes'))
453 yield chunk
454 finally:
455 fp.close()
456 finally:
457 progress(_('bundle'), None)
458
459 def generatev2(repo):
460 """Emit content for version 2 of a streaming clone.
461
462 the data stream consists the following entries:
463 1) A varint containing the length of the filename
464 2) A varint containing the length of file data
465 3) N bytes containing the filename (the internal, store-agnostic form)
466 4) N bytes containing the file data
467
468 Returns a 3-tuple of (file count, file size, data iterator).
469 """
470
471 with repo.lock():
472
473 entries = []
474 totalfilesize = 0
475
476 repo.ui.debug('scanning\n')
477 for name, ename, size in _walkstreamfiles(repo):
478 if size:
479 entries.append((name, size))
480 totalfilesize += size
481
482 chunks = _emit(repo, entries, totalfilesize)
483
484 return len(entries), totalfilesize, chunks
485
486 def consumev2(repo, fp, filecount, filesize):
487 """Apply the contents from a version 2 streaming clone.
488
489 Data is read from an object that only needs to provide a ``read(size)``
490 method.
491 """
492 with repo.lock():
493 repo.ui.status(_('%d files to transfer, %s of data\n') %
494 (filecount, util.bytecount(filesize)))
495
496 start = util.timer()
497 handledbytes = 0
498 progress = repo.ui.progress
499
500 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
501
502 vfs = repo.svfs
503
504 with repo.transaction('clone'):
505 with vfs.backgroundclosing(repo.ui):
506 for i in range(filecount):
507 namelen = util.uvarintdecodestream(fp)
508 datalen = util.uvarintdecodestream(fp)
509
510 name = fp.read(namelen)
511
512 if repo.ui.debugflag:
513 repo.ui.debug('adding %s (%s)\n' %
514 (name, util.bytecount(datalen)))
515
516 with vfs(name, 'w') as ofp:
517 for chunk in util.filechunkiter(fp, limit=datalen):
518 handledbytes += len(chunk)
519 progress(_('clone'), handledbytes, total=filesize,
520 unit=_('bytes'))
521 ofp.write(chunk)
522
523 # force @filecache properties to be reloaded from
524 # streamclone-ed file at next access
525 repo.invalidate(clearfilecache=True)
526
527 elapsed = util.timer() - start
528 if elapsed <= 0:
529 elapsed = 0.001
530 progress(_('clone'), None)
531 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
532 (util.bytecount(handledbytes), elapsed,
533 util.bytecount(handledbytes / elapsed)))
534
535 def applybundlev2(repo, fp, filecount, filesize, requirements):
536 missingreqs = [r for r in requirements if r not in repo.supported]
537 if missingreqs:
538 raise error.Abort(_('unable to apply stream clone: '
539 'unsupported format: %s') %
540 ', '.join(sorted(missingreqs)))
541
542 consumev2(repo, fp, filecount, filesize)