comparison mercurial/streamclone.py @ 43076:2372284d9457

formatting: blacken the codebase This is using my patch to black (https://github.com/psf/black/pull/826) so we don't un-wrap collection literals. Done with: hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S # skip-blame mass-reformatting only # no-check-commit reformats foo_bar functions Differential Revision: https://phab.mercurial-scm.org/D6971
author Augie Fackler <augie@google.com>
date Sun, 06 Oct 2019 09:45:02 -0400
parents 268662aac075
children 687b865b95ad
comparison
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
10 import contextlib 10 import contextlib
11 import os 11 import os
12 import struct 12 import struct
13 13
14 from .i18n import _ 14 from .i18n import _
15 from .interfaces import ( 15 from .interfaces import repository
16 repository,
17 )
18 from . import ( 16 from . import (
19 cacheutil, 17 cacheutil,
20 error, 18 error,
21 narrowspec, 19 narrowspec,
22 phases, 20 phases,
23 pycompat, 21 pycompat,
24 store, 22 store,
25 util, 23 util,
26 ) 24 )
27 25
26
28 def canperformstreamclone(pullop, bundle2=False): 27 def canperformstreamclone(pullop, bundle2=False):
29 """Whether it is possible to perform a streaming clone as part of pull. 28 """Whether it is possible to perform a streaming clone as part of pull.
30 29
31 ``bundle2`` will cause the function to consider stream clone through 30 ``bundle2`` will cause the function to consider stream clone through
32 bundle2 and only through bundle2. 31 bundle2 and only through bundle2.
42 bundle2supported = False 41 bundle2supported = False
43 if pullop.canusebundle2: 42 if pullop.canusebundle2:
44 if 'v2' in pullop.remotebundle2caps.get('stream', []): 43 if 'v2' in pullop.remotebundle2caps.get('stream', []):
45 bundle2supported = True 44 bundle2supported = True
46 # else 45 # else
47 # Server doesn't support bundle2 stream clone or doesn't support 46 # Server doesn't support bundle2 stream clone or doesn't support
48 # the versions we support. Fall back and possibly allow legacy. 47 # the versions we support. Fall back and possibly allow legacy.
49 48
50 # Ensures legacy code path uses available bundle2. 49 # Ensures legacy code path uses available bundle2.
51 if bundle2supported and not bundle2: 50 if bundle2supported and not bundle2:
52 return False, None 51 return False, None
53 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported. 52 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
85 requirements.add('revlogv1') 84 requirements.add('revlogv1')
86 else: 85 else:
87 streamreqs = remote.capable('streamreqs') 86 streamreqs = remote.capable('streamreqs')
88 # This is weird and shouldn't happen with modern servers. 87 # This is weird and shouldn't happen with modern servers.
89 if not streamreqs: 88 if not streamreqs:
90 pullop.repo.ui.warn(_( 89 pullop.repo.ui.warn(
91 'warning: stream clone requested but server has them ' 90 _(
92 'disabled\n')) 91 'warning: stream clone requested but server has them '
92 'disabled\n'
93 )
94 )
93 return False, None 95 return False, None
94 96
95 streamreqs = set(streamreqs.split(',')) 97 streamreqs = set(streamreqs.split(','))
96 # Server requires something we don't support. Bail. 98 # Server requires something we don't support. Bail.
97 missingreqs = streamreqs - repo.supportedformats 99 missingreqs = streamreqs - repo.supportedformats
98 if missingreqs: 100 if missingreqs:
99 pullop.repo.ui.warn(_(
100 'warning: stream clone requested but client is missing '
101 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
102 pullop.repo.ui.warn( 101 pullop.repo.ui.warn(
103 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement ' 102 _(
104 'for more information)\n')) 103 'warning: stream clone requested but client is missing '
104 'requirements: %s\n'
105 )
106 % ', '.join(sorted(missingreqs))
107 )
108 pullop.repo.ui.warn(
109 _(
110 '(see https://www.mercurial-scm.org/wiki/MissingRequirement '
111 'for more information)\n'
112 )
113 )
105 return False, None 114 return False, None
106 requirements = streamreqs 115 requirements = streamreqs
107 116
108 return True, requirements 117 return True, requirements
118
109 119
110 def maybeperformlegacystreamclone(pullop): 120 def maybeperformlegacystreamclone(pullop):
111 """Possibly perform a legacy stream clone operation. 121 """Possibly perform a legacy stream clone operation.
112 122
113 Legacy stream clones are performed as part of pull but before all other 123 Legacy stream clones are performed as part of pull but before all other
145 l = fp.readline() 155 l = fp.readline()
146 try: 156 try:
147 resp = int(l) 157 resp = int(l)
148 except ValueError: 158 except ValueError:
149 raise error.ResponseError( 159 raise error.ResponseError(
150 _('unexpected response from remote server:'), l) 160 _('unexpected response from remote server:'), l
161 )
151 if resp == 1: 162 if resp == 1:
152 raise error.Abort(_('operation forbidden by server')) 163 raise error.Abort(_('operation forbidden by server'))
153 elif resp == 2: 164 elif resp == 2:
154 raise error.Abort(_('locking the remote repository failed')) 165 raise error.Abort(_('locking the remote repository failed'))
155 elif resp != 0: 166 elif resp != 0:
158 l = fp.readline() 169 l = fp.readline()
159 try: 170 try:
160 filecount, bytecount = map(int, l.split(' ', 1)) 171 filecount, bytecount = map(int, l.split(' ', 1))
161 except (ValueError, TypeError): 172 except (ValueError, TypeError):
162 raise error.ResponseError( 173 raise error.ResponseError(
163 _('unexpected response from remote server:'), l) 174 _('unexpected response from remote server:'), l
175 )
164 176
165 with repo.lock(): 177 with repo.lock():
166 consumev1(repo, fp, filecount, bytecount) 178 consumev1(repo, fp, filecount, bytecount)
167 179
168 # new requirements = old non-format requirements + 180 # new requirements = old non-format requirements +
169 # new format-related remote requirements 181 # new format-related remote requirements
170 # requirements from the streamed-in repository 182 # requirements from the streamed-in repository
171 repo.requirements = requirements | ( 183 repo.requirements = requirements | (
172 repo.requirements - repo.supportedformats) 184 repo.requirements - repo.supportedformats
185 )
173 repo.svfs.options = localrepo.resolvestorevfsoptions( 186 repo.svfs.options = localrepo.resolvestorevfsoptions(
174 repo.ui, repo.requirements, repo.features) 187 repo.ui, repo.requirements, repo.features
188 )
175 repo._writerequirements() 189 repo._writerequirements()
176 190
177 if rbranchmap: 191 if rbranchmap:
178 repo._branchcaches.replace(repo, rbranchmap) 192 repo._branchcaches.replace(repo, rbranchmap)
179 193
180 repo.invalidate() 194 repo.invalidate()
195
181 196
182 def allowservergeneration(repo): 197 def allowservergeneration(repo):
183 """Whether streaming clones are allowed from the server.""" 198 """Whether streaming clones are allowed from the server."""
184 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: 199 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
185 return False 200 return False
193 if secret: 208 if secret:
194 return repo.ui.configbool('server', 'uncompressedallowsecret') 209 return repo.ui.configbool('server', 'uncompressedallowsecret')
195 210
196 return True 211 return True
197 212
213
198 # This is it's own function so extensions can override it. 214 # This is it's own function so extensions can override it.
199 def _walkstreamfiles(repo, matcher=None): 215 def _walkstreamfiles(repo, matcher=None):
200 return repo.store.walk(matcher) 216 return repo.store.walk(matcher)
217
201 218
202 def generatev1(repo): 219 def generatev1(repo):
203 """Emit content for version 1 of a streaming clone. 220 """Emit content for version 1 of a streaming clone.
204 221
205 This returns a 3-tuple of (file count, byte size, data iterator). 222 This returns a 3-tuple of (file count, byte size, data iterator).
226 for name, ename, size in _walkstreamfiles(repo): 243 for name, ename, size in _walkstreamfiles(repo):
227 if size: 244 if size:
228 entries.append((name, size)) 245 entries.append((name, size))
229 total_bytes += size 246 total_bytes += size
230 247
231 repo.ui.debug('%d files, %d bytes to transfer\n' % 248 repo.ui.debug(
232 (len(entries), total_bytes)) 249 '%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
250 )
233 251
234 svfs = repo.svfs 252 svfs = repo.svfs
235 debugflag = repo.ui.debugflag 253 debugflag = repo.ui.debugflag
236 254
237 def emitrevlogdata(): 255 def emitrevlogdata():
249 for chunk in util.filechunkiter(fp, limit=size): 267 for chunk in util.filechunkiter(fp, limit=size):
250 yield chunk 268 yield chunk
251 269
252 return len(entries), total_bytes, emitrevlogdata() 270 return len(entries), total_bytes, emitrevlogdata()
253 271
272
254 def generatev1wireproto(repo): 273 def generatev1wireproto(repo):
255 """Emit content for version 1 of streaming clone suitable for the wire. 274 """Emit content for version 1 of streaming clone suitable for the wire.
256 275
257 This is the data output from ``generatev1()`` with 2 header lines. The 276 This is the data output from ``generatev1()`` with 2 header lines. The
258 first line indicates overall success. The 2nd contains the file count and 277 first line indicates overall success. The 2nd contains the file count and
276 yield '0\n' 295 yield '0\n'
277 yield '%d %d\n' % (filecount, bytecount) 296 yield '%d %d\n' % (filecount, bytecount)
278 for chunk in it: 297 for chunk in it:
279 yield chunk 298 yield chunk
280 299
300
281 def generatebundlev1(repo, compression='UN'): 301 def generatebundlev1(repo, compression='UN'):
282 """Emit content for version 1 of a stream clone bundle. 302 """Emit content for version 1 of a stream clone bundle.
283 303
284 The first 4 bytes of the output ("HGS1") denote this as stream clone 304 The first 4 bytes of the output ("HGS1") denote this as stream clone
285 bundle version 1. 305 bundle version 1.
309 def gen(): 329 def gen():
310 yield 'HGS1' 330 yield 'HGS1'
311 yield compression 331 yield compression
312 332
313 filecount, bytecount, it = generatev1(repo) 333 filecount, bytecount, it = generatev1(repo)
314 repo.ui.status(_('writing %d bytes for %d files\n') % 334 repo.ui.status(
315 (bytecount, filecount)) 335 _('writing %d bytes for %d files\n') % (bytecount, filecount)
336 )
316 337
317 yield struct.pack('>QQ', filecount, bytecount) 338 yield struct.pack('>QQ', filecount, bytecount)
318 yield struct.pack('>H', len(requires) + 1) 339 yield struct.pack('>H', len(requires) + 1)
319 yield requires + '\0' 340 yield requires + '\0'
320 341
321 # This is where we'll add compression in the future. 342 # This is where we'll add compression in the future.
322 assert compression == 'UN' 343 assert compression == 'UN'
323 344
324 progress = repo.ui.makeprogress(_('bundle'), total=bytecount, 345 progress = repo.ui.makeprogress(
325 unit=_('bytes')) 346 _('bundle'), total=bytecount, unit=_('bytes')
347 )
326 progress.update(0) 348 progress.update(0)
327 349
328 for chunk in it: 350 for chunk in it:
329 progress.increment(step=len(chunk)) 351 progress.increment(step=len(chunk))
330 yield chunk 352 yield chunk
331 353
332 progress.complete() 354 progress.complete()
333 355
334 return requirements, gen() 356 return requirements, gen()
335 357
358
336 def consumev1(repo, fp, filecount, bytecount): 359 def consumev1(repo, fp, filecount, bytecount):
337 """Apply the contents from version 1 of a streaming clone file handle. 360 """Apply the contents from version 1 of a streaming clone file handle.
338 361
339 This takes the output from "stream_out" and applies it to the specified 362 This takes the output from "stream_out" and applies it to the specified
340 repository. 363 repository.
341 364
342 Like "stream_out," the status line added by the wire protocol is not 365 Like "stream_out," the status line added by the wire protocol is not
343 handled by this function. 366 handled by this function.
344 """ 367 """
345 with repo.lock(): 368 with repo.lock():
346 repo.ui.status(_('%d files to transfer, %s of data\n') % 369 repo.ui.status(
347 (filecount, util.bytecount(bytecount))) 370 _('%d files to transfer, %s of data\n')
348 progress = repo.ui.makeprogress(_('clone'), total=bytecount, 371 % (filecount, util.bytecount(bytecount))
349 unit=_('bytes')) 372 )
373 progress = repo.ui.makeprogress(
374 _('clone'), total=bytecount, unit=_('bytes')
375 )
350 progress.update(0) 376 progress.update(0)
351 start = util.timer() 377 start = util.timer()
352 378
353 # TODO: get rid of (potential) inconsistency 379 # TODO: get rid of (potential) inconsistency
354 # 380 #
372 try: 398 try:
373 name, size = l.split('\0', 1) 399 name, size = l.split('\0', 1)
374 size = int(size) 400 size = int(size)
375 except (ValueError, TypeError): 401 except (ValueError, TypeError):
376 raise error.ResponseError( 402 raise error.ResponseError(
377 _('unexpected response from remote server:'), l) 403 _('unexpected response from remote server:'), l
404 )
378 if repo.ui.debugflag: 405 if repo.ui.debugflag:
379 repo.ui.debug('adding %s (%s)\n' % 406 repo.ui.debug(
380 (name, util.bytecount(size))) 407 'adding %s (%s)\n' % (name, util.bytecount(size))
408 )
381 # for backwards compat, name was partially encoded 409 # for backwards compat, name was partially encoded
382 path = store.decodedir(name) 410 path = store.decodedir(name)
383 with repo.svfs(path, 'w', backgroundclose=True) as ofp: 411 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
384 for chunk in util.filechunkiter(fp, limit=size): 412 for chunk in util.filechunkiter(fp, limit=size):
385 progress.increment(step=len(chunk)) 413 progress.increment(step=len(chunk))
391 419
392 elapsed = util.timer() - start 420 elapsed = util.timer() - start
393 if elapsed <= 0: 421 if elapsed <= 0:
394 elapsed = 0.001 422 elapsed = 0.001
395 progress.complete() 423 progress.complete()
396 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % 424 repo.ui.status(
397 (util.bytecount(bytecount), elapsed, 425 _('transferred %s in %.1f seconds (%s/sec)\n')
398 util.bytecount(bytecount / elapsed))) 426 % (
427 util.bytecount(bytecount),
428 elapsed,
429 util.bytecount(bytecount / elapsed),
430 )
431 )
432
399 433
400 def readbundle1header(fp): 434 def readbundle1header(fp):
401 compression = fp.read(2) 435 compression = fp.read(2)
402 if compression != 'UN': 436 if compression != 'UN':
403 raise error.Abort(_('only uncompressed stream clone bundles are ' 437 raise error.Abort(
404 'supported; got %s') % compression) 438 _('only uncompressed stream clone bundles are ' 'supported; got %s')
439 % compression
440 )
405 441
406 filecount, bytecount = struct.unpack('>QQ', fp.read(16)) 442 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
407 requireslen = struct.unpack('>H', fp.read(2))[0] 443 requireslen = struct.unpack('>H', fp.read(2))[0]
408 requires = fp.read(requireslen) 444 requires = fp.read(requireslen)
409 445
410 if not requires.endswith('\0'): 446 if not requires.endswith('\0'):
411 raise error.Abort(_('malformed stream clone bundle: ' 447 raise error.Abort(
412 'requirements not properly encoded')) 448 _(
449 'malformed stream clone bundle: '
450 'requirements not properly encoded'
451 )
452 )
413 453
414 requirements = set(requires.rstrip('\0').split(',')) 454 requirements = set(requires.rstrip('\0').split(','))
415 455
416 return filecount, bytecount, requirements 456 return filecount, bytecount, requirements
457
417 458
418 def applybundlev1(repo, fp): 459 def applybundlev1(repo, fp):
419 """Apply the content from a stream clone bundle version 1. 460 """Apply the content from a stream clone bundle version 1.
420 461
421 We assume the 4 byte header has been read and validated and the file handle 462 We assume the 4 byte header has been read and validated and the file handle
422 is at the 2 byte compression identifier. 463 is at the 2 byte compression identifier.
423 """ 464 """
424 if len(repo): 465 if len(repo):
425 raise error.Abort(_('cannot apply stream clone bundle on non-empty ' 466 raise error.Abort(
426 'repo')) 467 _('cannot apply stream clone bundle on non-empty ' 'repo')
468 )
427 469
428 filecount, bytecount, requirements = readbundle1header(fp) 470 filecount, bytecount, requirements = readbundle1header(fp)
429 missingreqs = requirements - repo.supportedformats 471 missingreqs = requirements - repo.supportedformats
430 if missingreqs: 472 if missingreqs:
431 raise error.Abort(_('unable to apply stream clone: ' 473 raise error.Abort(
432 'unsupported format: %s') % 474 _('unable to apply stream clone: ' 'unsupported format: %s')
433 ', '.join(sorted(missingreqs))) 475 % ', '.join(sorted(missingreqs))
476 )
434 477
435 consumev1(repo, fp, filecount, bytecount) 478 consumev1(repo, fp, filecount, bytecount)
479
436 480
437 class streamcloneapplier(object): 481 class streamcloneapplier(object):
438 """Class to manage applying streaming clone bundles. 482 """Class to manage applying streaming clone bundles.
439 483
440 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle 484 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
441 readers to perform bundle type-specific functionality. 485 readers to perform bundle type-specific functionality.
442 """ 486 """
487
443 def __init__(self, fh): 488 def __init__(self, fh):
444 self._fh = fh 489 self._fh = fh
445 490
446 def apply(self, repo): 491 def apply(self, repo):
447 return applybundlev1(repo, self._fh) 492 return applybundlev1(repo, self._fh)
448 493
494
449 # type of file to stream 495 # type of file to stream
450 _fileappend = 0 # append only file 496 _fileappend = 0 # append only file
451 _filefull = 1 # full snapshot file 497 _filefull = 1 # full snapshot file
452 498
453 # Source of the file 499 # Source of the file
454 _srcstore = 's' # store (svfs) 500 _srcstore = 's' # store (svfs)
455 _srccache = 'c' # cache (cache) 501 _srccache = 'c' # cache (cache)
456 502
457 # This is it's own function so extensions can override it. 503 # This is it's own function so extensions can override it.
458 def _walkstreamfullstorefiles(repo): 504 def _walkstreamfullstorefiles(repo):
459 """list snapshot file from the store""" 505 """list snapshot file from the store"""
460 fnames = [] 506 fnames = []
461 if not repo.publishing(): 507 if not repo.publishing():
462 fnames.append('phaseroots') 508 fnames.append('phaseroots')
463 return fnames 509 return fnames
464 510
511
465 def _filterfull(entry, copy, vfsmap): 512 def _filterfull(entry, copy, vfsmap):
466 """actually copy the snapshot files""" 513 """actually copy the snapshot files"""
467 src, name, ftype, data = entry 514 src, name, ftype, data = entry
468 if ftype != _filefull: 515 if ftype != _filefull:
469 return entry 516 return entry
470 return (src, name, ftype, copy(vfsmap[src].join(name))) 517 return (src, name, ftype, copy(vfsmap[src].join(name)))
471 518
519
472 @contextlib.contextmanager 520 @contextlib.contextmanager
473 def maketempcopies(): 521 def maketempcopies():
474 """return a function to temporary copy file""" 522 """return a function to temporary copy file"""
475 files = [] 523 files = []
476 try: 524 try:
525
477 def copy(src): 526 def copy(src):
478 fd, dst = pycompat.mkstemp() 527 fd, dst = pycompat.mkstemp()
479 os.close(fd) 528 os.close(fd)
480 files.append(dst) 529 files.append(dst)
481 util.copyfiles(src, dst, hardlink=True) 530 util.copyfiles(src, dst, hardlink=True)
482 return dst 531 return dst
532
483 yield copy 533 yield copy
484 finally: 534 finally:
485 for tmp in files: 535 for tmp in files:
486 util.tryunlink(tmp) 536 util.tryunlink(tmp)
537
487 538
488 def _makemap(repo): 539 def _makemap(repo):
489 """make a (src -> vfs) map for the repo""" 540 """make a (src -> vfs) map for the repo"""
490 vfsmap = { 541 vfsmap = {
491 _srcstore: repo.svfs, 542 _srcstore: repo.svfs,
495 # (eg: .hg/hgrc) 546 # (eg: .hg/hgrc)
496 assert repo.vfs not in vfsmap.values() 547 assert repo.vfs not in vfsmap.values()
497 548
498 return vfsmap 549 return vfsmap
499 550
551
500 def _emit2(repo, entries, totalfilesize): 552 def _emit2(repo, entries, totalfilesize):
501 """actually emit the stream bundle""" 553 """actually emit the stream bundle"""
502 vfsmap = _makemap(repo) 554 vfsmap = _makemap(repo)
503 progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize, 555 progress = repo.ui.makeprogress(
504 unit=_('bytes')) 556 _('bundle'), total=totalfilesize, unit=_('bytes')
557 )
505 progress.update(0) 558 progress.update(0)
506 with maketempcopies() as copy, progress: 559 with maketempcopies() as copy, progress:
507 # copy is delayed until we are in the try 560 # copy is delayed until we are in the try
508 entries = [_filterfull(e, copy, vfsmap) for e in entries] 561 entries = [_filterfull(e, copy, vfsmap) for e in entries]
509 yield None # this release the lock on the repository 562 yield None # this release the lock on the repository
510 seen = 0 563 seen = 0
511 564
512 for src, name, ftype, data in entries: 565 for src, name, ftype, data in entries:
513 vfs = vfsmap[src] 566 vfs = vfsmap[src]
514 yield src 567 yield src
531 progress.update(seen) 584 progress.update(seen)
532 yield chunk 585 yield chunk
533 finally: 586 finally:
534 fp.close() 587 fp.close()
535 588
589
536 def generatev2(repo, includes, excludes, includeobsmarkers): 590 def generatev2(repo, includes, excludes, includeobsmarkers):
537 """Emit content for version 2 of a streaming clone. 591 """Emit content for version 2 of a streaming clone.
538 592
539 the data stream consists the following entries: 593 the data stream consists the following entries:
540 1) A char representing the file destination (eg: store or cache) 594 1) A char representing the file destination (eg: store or cache)
576 first = next(chunks) 630 first = next(chunks)
577 assert first is None 631 assert first is None
578 632
579 return len(entries), totalfilesize, chunks 633 return len(entries), totalfilesize, chunks
580 634
635
581 @contextlib.contextmanager 636 @contextlib.contextmanager
582 def nested(*ctxs): 637 def nested(*ctxs):
583 this = ctxs[0] 638 this = ctxs[0]
584 rest = ctxs[1:] 639 rest = ctxs[1:]
585 with this: 640 with this:
587 with nested(*rest): 642 with nested(*rest):
588 yield 643 yield
589 else: 644 else:
590 yield 645 yield
591 646
647
592 def consumev2(repo, fp, filecount, filesize): 648 def consumev2(repo, fp, filecount, filesize):
593 """Apply the contents from a version 2 streaming clone. 649 """Apply the contents from a version 2 streaming clone.
594 650
595 Data is read from an object that only needs to provide a ``read(size)`` 651 Data is read from an object that only needs to provide a ``read(size)``
596 method. 652 method.
597 """ 653 """
598 with repo.lock(): 654 with repo.lock():
599 repo.ui.status(_('%d files to transfer, %s of data\n') % 655 repo.ui.status(
600 (filecount, util.bytecount(filesize))) 656 _('%d files to transfer, %s of data\n')
657 % (filecount, util.bytecount(filesize))
658 )
601 659
602 start = util.timer() 660 start = util.timer()
603 progress = repo.ui.makeprogress(_('clone'), total=filesize, 661 progress = repo.ui.makeprogress(
604 unit=_('bytes')) 662 _('clone'), total=filesize, unit=_('bytes')
663 )
605 progress.update(0) 664 progress.update(0)
606 665
607 vfsmap = _makemap(repo) 666 vfsmap = _makemap(repo)
608 667
609 with repo.transaction('clone'): 668 with repo.transaction('clone'):
610 ctxs = (vfs.backgroundclosing(repo.ui) 669 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
611 for vfs in vfsmap.values())
612 with nested(*ctxs): 670 with nested(*ctxs):
613 for i in range(filecount): 671 for i in range(filecount):
614 src = util.readexactly(fp, 1) 672 src = util.readexactly(fp, 1)
615 vfs = vfsmap[src] 673 vfs = vfsmap[src]
616 namelen = util.uvarintdecodestream(fp) 674 namelen = util.uvarintdecodestream(fp)
617 datalen = util.uvarintdecodestream(fp) 675 datalen = util.uvarintdecodestream(fp)
618 676
619 name = util.readexactly(fp, namelen) 677 name = util.readexactly(fp, namelen)
620 678
621 if repo.ui.debugflag: 679 if repo.ui.debugflag:
622 repo.ui.debug('adding [%s] %s (%s)\n' % 680 repo.ui.debug(
623 (src, name, util.bytecount(datalen))) 681 'adding [%s] %s (%s)\n'
682 % (src, name, util.bytecount(datalen))
683 )
624 684
625 with vfs(name, 'w') as ofp: 685 with vfs(name, 'w') as ofp:
626 for chunk in util.filechunkiter(fp, limit=datalen): 686 for chunk in util.filechunkiter(fp, limit=datalen):
627 progress.increment(step=len(chunk)) 687 progress.increment(step=len(chunk))
628 ofp.write(chunk) 688 ofp.write(chunk)
632 repo.invalidate(clearfilecache=True) 692 repo.invalidate(clearfilecache=True)
633 693
634 elapsed = util.timer() - start 694 elapsed = util.timer() - start
635 if elapsed <= 0: 695 if elapsed <= 0:
636 elapsed = 0.001 696 elapsed = 0.001
637 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % 697 repo.ui.status(
638 (util.bytecount(progress.pos), elapsed, 698 _('transferred %s in %.1f seconds (%s/sec)\n')
639 util.bytecount(progress.pos / elapsed))) 699 % (
700 util.bytecount(progress.pos),
701 elapsed,
702 util.bytecount(progress.pos / elapsed),
703 )
704 )
640 progress.complete() 705 progress.complete()
706
641 707
642 def applybundlev2(repo, fp, filecount, filesize, requirements): 708 def applybundlev2(repo, fp, filecount, filesize, requirements):
643 from . import localrepo 709 from . import localrepo
644 710
645 missingreqs = [r for r in requirements if r not in repo.supported] 711 missingreqs = [r for r in requirements if r not in repo.supported]
646 if missingreqs: 712 if missingreqs:
647 raise error.Abort(_('unable to apply stream clone: ' 713 raise error.Abort(
648 'unsupported format: %s') % 714 _('unable to apply stream clone: ' 'unsupported format: %s')
649 ', '.join(sorted(missingreqs))) 715 % ', '.join(sorted(missingreqs))
716 )
650 717
651 consumev2(repo, fp, filecount, filesize) 718 consumev2(repo, fp, filecount, filesize)
652 719
653 # new requirements = old non-format requirements + 720 # new requirements = old non-format requirements +
654 # new format-related remote requirements 721 # new format-related remote requirements
655 # requirements from the streamed-in repository 722 # requirements from the streamed-in repository
656 repo.requirements = set(requirements) | ( 723 repo.requirements = set(requirements) | (
657 repo.requirements - repo.supportedformats) 724 repo.requirements - repo.supportedformats
725 )
658 repo.svfs.options = localrepo.resolvestorevfsoptions( 726 repo.svfs.options = localrepo.resolvestorevfsoptions(
659 repo.ui, repo.requirements, repo.features) 727 repo.ui, repo.requirements, repo.features
728 )
660 repo._writerequirements() 729 repo._writerequirements()