Mercurial > hg
comparison mercurial/streamclone.py @ 43076:2372284d9457
formatting: blacken the codebase
This is using my patch to black
(https://github.com/psf/black/pull/826) so we don't un-wrap collection
literals.
Done with:
hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S
# skip-blame mass-reformatting only
# no-check-commit reformats foo_bar functions
Differential Revision: https://phab.mercurial-scm.org/D6971
author | Augie Fackler <augie@google.com> |
---|---|
date | Sun, 06 Oct 2019 09:45:02 -0400 |
parents | 268662aac075 |
children | 687b865b95ad |
comparison
equal
deleted
inserted
replaced
43075:57875cf423c9 | 43076:2372284d9457 |
---|---|
10 import contextlib | 10 import contextlib |
11 import os | 11 import os |
12 import struct | 12 import struct |
13 | 13 |
14 from .i18n import _ | 14 from .i18n import _ |
15 from .interfaces import ( | 15 from .interfaces import repository |
16 repository, | |
17 ) | |
18 from . import ( | 16 from . import ( |
19 cacheutil, | 17 cacheutil, |
20 error, | 18 error, |
21 narrowspec, | 19 narrowspec, |
22 phases, | 20 phases, |
23 pycompat, | 21 pycompat, |
24 store, | 22 store, |
25 util, | 23 util, |
26 ) | 24 ) |
27 | 25 |
26 | |
28 def canperformstreamclone(pullop, bundle2=False): | 27 def canperformstreamclone(pullop, bundle2=False): |
29 """Whether it is possible to perform a streaming clone as part of pull. | 28 """Whether it is possible to perform a streaming clone as part of pull. |
30 | 29 |
31 ``bundle2`` will cause the function to consider stream clone through | 30 ``bundle2`` will cause the function to consider stream clone through |
32 bundle2 and only through bundle2. | 31 bundle2 and only through bundle2. |
42 bundle2supported = False | 41 bundle2supported = False |
43 if pullop.canusebundle2: | 42 if pullop.canusebundle2: |
44 if 'v2' in pullop.remotebundle2caps.get('stream', []): | 43 if 'v2' in pullop.remotebundle2caps.get('stream', []): |
45 bundle2supported = True | 44 bundle2supported = True |
46 # else | 45 # else |
47 # Server doesn't support bundle2 stream clone or doesn't support | 46 # Server doesn't support bundle2 stream clone or doesn't support |
48 # the versions we support. Fall back and possibly allow legacy. | 47 # the versions we support. Fall back and possibly allow legacy. |
49 | 48 |
50 # Ensures legacy code path uses available bundle2. | 49 # Ensures legacy code path uses available bundle2. |
51 if bundle2supported and not bundle2: | 50 if bundle2supported and not bundle2: |
52 return False, None | 51 return False, None |
53 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported. | 52 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported. |
85 requirements.add('revlogv1') | 84 requirements.add('revlogv1') |
86 else: | 85 else: |
87 streamreqs = remote.capable('streamreqs') | 86 streamreqs = remote.capable('streamreqs') |
88 # This is weird and shouldn't happen with modern servers. | 87 # This is weird and shouldn't happen with modern servers. |
89 if not streamreqs: | 88 if not streamreqs: |
90 pullop.repo.ui.warn(_( | 89 pullop.repo.ui.warn( |
91 'warning: stream clone requested but server has them ' | 90 _( |
92 'disabled\n')) | 91 'warning: stream clone requested but server has them ' |
92 'disabled\n' | |
93 ) | |
94 ) | |
93 return False, None | 95 return False, None |
94 | 96 |
95 streamreqs = set(streamreqs.split(',')) | 97 streamreqs = set(streamreqs.split(',')) |
96 # Server requires something we don't support. Bail. | 98 # Server requires something we don't support. Bail. |
97 missingreqs = streamreqs - repo.supportedformats | 99 missingreqs = streamreqs - repo.supportedformats |
98 if missingreqs: | 100 if missingreqs: |
99 pullop.repo.ui.warn(_( | |
100 'warning: stream clone requested but client is missing ' | |
101 'requirements: %s\n') % ', '.join(sorted(missingreqs))) | |
102 pullop.repo.ui.warn( | 101 pullop.repo.ui.warn( |
103 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement ' | 102 _( |
104 'for more information)\n')) | 103 'warning: stream clone requested but client is missing ' |
104 'requirements: %s\n' | |
105 ) | |
106 % ', '.join(sorted(missingreqs)) | |
107 ) | |
108 pullop.repo.ui.warn( | |
109 _( | |
110 '(see https://www.mercurial-scm.org/wiki/MissingRequirement ' | |
111 'for more information)\n' | |
112 ) | |
113 ) | |
105 return False, None | 114 return False, None |
106 requirements = streamreqs | 115 requirements = streamreqs |
107 | 116 |
108 return True, requirements | 117 return True, requirements |
118 | |
109 | 119 |
110 def maybeperformlegacystreamclone(pullop): | 120 def maybeperformlegacystreamclone(pullop): |
111 """Possibly perform a legacy stream clone operation. | 121 """Possibly perform a legacy stream clone operation. |
112 | 122 |
113 Legacy stream clones are performed as part of pull but before all other | 123 Legacy stream clones are performed as part of pull but before all other |
145 l = fp.readline() | 155 l = fp.readline() |
146 try: | 156 try: |
147 resp = int(l) | 157 resp = int(l) |
148 except ValueError: | 158 except ValueError: |
149 raise error.ResponseError( | 159 raise error.ResponseError( |
150 _('unexpected response from remote server:'), l) | 160 _('unexpected response from remote server:'), l |
161 ) | |
151 if resp == 1: | 162 if resp == 1: |
152 raise error.Abort(_('operation forbidden by server')) | 163 raise error.Abort(_('operation forbidden by server')) |
153 elif resp == 2: | 164 elif resp == 2: |
154 raise error.Abort(_('locking the remote repository failed')) | 165 raise error.Abort(_('locking the remote repository failed')) |
155 elif resp != 0: | 166 elif resp != 0: |
158 l = fp.readline() | 169 l = fp.readline() |
159 try: | 170 try: |
160 filecount, bytecount = map(int, l.split(' ', 1)) | 171 filecount, bytecount = map(int, l.split(' ', 1)) |
161 except (ValueError, TypeError): | 172 except (ValueError, TypeError): |
162 raise error.ResponseError( | 173 raise error.ResponseError( |
163 _('unexpected response from remote server:'), l) | 174 _('unexpected response from remote server:'), l |
175 ) | |
164 | 176 |
165 with repo.lock(): | 177 with repo.lock(): |
166 consumev1(repo, fp, filecount, bytecount) | 178 consumev1(repo, fp, filecount, bytecount) |
167 | 179 |
168 # new requirements = old non-format requirements + | 180 # new requirements = old non-format requirements + |
169 # new format-related remote requirements | 181 # new format-related remote requirements |
170 # requirements from the streamed-in repository | 182 # requirements from the streamed-in repository |
171 repo.requirements = requirements | ( | 183 repo.requirements = requirements | ( |
172 repo.requirements - repo.supportedformats) | 184 repo.requirements - repo.supportedformats |
185 ) | |
173 repo.svfs.options = localrepo.resolvestorevfsoptions( | 186 repo.svfs.options = localrepo.resolvestorevfsoptions( |
174 repo.ui, repo.requirements, repo.features) | 187 repo.ui, repo.requirements, repo.features |
188 ) | |
175 repo._writerequirements() | 189 repo._writerequirements() |
176 | 190 |
177 if rbranchmap: | 191 if rbranchmap: |
178 repo._branchcaches.replace(repo, rbranchmap) | 192 repo._branchcaches.replace(repo, rbranchmap) |
179 | 193 |
180 repo.invalidate() | 194 repo.invalidate() |
195 | |
181 | 196 |
182 def allowservergeneration(repo): | 197 def allowservergeneration(repo): |
183 """Whether streaming clones are allowed from the server.""" | 198 """Whether streaming clones are allowed from the server.""" |
184 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: | 199 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features: |
185 return False | 200 return False |
193 if secret: | 208 if secret: |
194 return repo.ui.configbool('server', 'uncompressedallowsecret') | 209 return repo.ui.configbool('server', 'uncompressedallowsecret') |
195 | 210 |
196 return True | 211 return True |
197 | 212 |
213 | |
198 # This is it's own function so extensions can override it. | 214 # This is it's own function so extensions can override it. |
199 def _walkstreamfiles(repo, matcher=None): | 215 def _walkstreamfiles(repo, matcher=None): |
200 return repo.store.walk(matcher) | 216 return repo.store.walk(matcher) |
217 | |
201 | 218 |
202 def generatev1(repo): | 219 def generatev1(repo): |
203 """Emit content for version 1 of a streaming clone. | 220 """Emit content for version 1 of a streaming clone. |
204 | 221 |
205 This returns a 3-tuple of (file count, byte size, data iterator). | 222 This returns a 3-tuple of (file count, byte size, data iterator). |
226 for name, ename, size in _walkstreamfiles(repo): | 243 for name, ename, size in _walkstreamfiles(repo): |
227 if size: | 244 if size: |
228 entries.append((name, size)) | 245 entries.append((name, size)) |
229 total_bytes += size | 246 total_bytes += size |
230 | 247 |
231 repo.ui.debug('%d files, %d bytes to transfer\n' % | 248 repo.ui.debug( |
232 (len(entries), total_bytes)) | 249 '%d files, %d bytes to transfer\n' % (len(entries), total_bytes) |
250 ) | |
233 | 251 |
234 svfs = repo.svfs | 252 svfs = repo.svfs |
235 debugflag = repo.ui.debugflag | 253 debugflag = repo.ui.debugflag |
236 | 254 |
237 def emitrevlogdata(): | 255 def emitrevlogdata(): |
249 for chunk in util.filechunkiter(fp, limit=size): | 267 for chunk in util.filechunkiter(fp, limit=size): |
250 yield chunk | 268 yield chunk |
251 | 269 |
252 return len(entries), total_bytes, emitrevlogdata() | 270 return len(entries), total_bytes, emitrevlogdata() |
253 | 271 |
272 | |
254 def generatev1wireproto(repo): | 273 def generatev1wireproto(repo): |
255 """Emit content for version 1 of streaming clone suitable for the wire. | 274 """Emit content for version 1 of streaming clone suitable for the wire. |
256 | 275 |
257 This is the data output from ``generatev1()`` with 2 header lines. The | 276 This is the data output from ``generatev1()`` with 2 header lines. The |
258 first line indicates overall success. The 2nd contains the file count and | 277 first line indicates overall success. The 2nd contains the file count and |
276 yield '0\n' | 295 yield '0\n' |
277 yield '%d %d\n' % (filecount, bytecount) | 296 yield '%d %d\n' % (filecount, bytecount) |
278 for chunk in it: | 297 for chunk in it: |
279 yield chunk | 298 yield chunk |
280 | 299 |
300 | |
281 def generatebundlev1(repo, compression='UN'): | 301 def generatebundlev1(repo, compression='UN'): |
282 """Emit content for version 1 of a stream clone bundle. | 302 """Emit content for version 1 of a stream clone bundle. |
283 | 303 |
284 The first 4 bytes of the output ("HGS1") denote this as stream clone | 304 The first 4 bytes of the output ("HGS1") denote this as stream clone |
285 bundle version 1. | 305 bundle version 1. |
309 def gen(): | 329 def gen(): |
310 yield 'HGS1' | 330 yield 'HGS1' |
311 yield compression | 331 yield compression |
312 | 332 |
313 filecount, bytecount, it = generatev1(repo) | 333 filecount, bytecount, it = generatev1(repo) |
314 repo.ui.status(_('writing %d bytes for %d files\n') % | 334 repo.ui.status( |
315 (bytecount, filecount)) | 335 _('writing %d bytes for %d files\n') % (bytecount, filecount) |
336 ) | |
316 | 337 |
317 yield struct.pack('>QQ', filecount, bytecount) | 338 yield struct.pack('>QQ', filecount, bytecount) |
318 yield struct.pack('>H', len(requires) + 1) | 339 yield struct.pack('>H', len(requires) + 1) |
319 yield requires + '\0' | 340 yield requires + '\0' |
320 | 341 |
321 # This is where we'll add compression in the future. | 342 # This is where we'll add compression in the future. |
322 assert compression == 'UN' | 343 assert compression == 'UN' |
323 | 344 |
324 progress = repo.ui.makeprogress(_('bundle'), total=bytecount, | 345 progress = repo.ui.makeprogress( |
325 unit=_('bytes')) | 346 _('bundle'), total=bytecount, unit=_('bytes') |
347 ) | |
326 progress.update(0) | 348 progress.update(0) |
327 | 349 |
328 for chunk in it: | 350 for chunk in it: |
329 progress.increment(step=len(chunk)) | 351 progress.increment(step=len(chunk)) |
330 yield chunk | 352 yield chunk |
331 | 353 |
332 progress.complete() | 354 progress.complete() |
333 | 355 |
334 return requirements, gen() | 356 return requirements, gen() |
335 | 357 |
358 | |
336 def consumev1(repo, fp, filecount, bytecount): | 359 def consumev1(repo, fp, filecount, bytecount): |
337 """Apply the contents from version 1 of a streaming clone file handle. | 360 """Apply the contents from version 1 of a streaming clone file handle. |
338 | 361 |
339 This takes the output from "stream_out" and applies it to the specified | 362 This takes the output from "stream_out" and applies it to the specified |
340 repository. | 363 repository. |
341 | 364 |
342 Like "stream_out," the status line added by the wire protocol is not | 365 Like "stream_out," the status line added by the wire protocol is not |
343 handled by this function. | 366 handled by this function. |
344 """ | 367 """ |
345 with repo.lock(): | 368 with repo.lock(): |
346 repo.ui.status(_('%d files to transfer, %s of data\n') % | 369 repo.ui.status( |
347 (filecount, util.bytecount(bytecount))) | 370 _('%d files to transfer, %s of data\n') |
348 progress = repo.ui.makeprogress(_('clone'), total=bytecount, | 371 % (filecount, util.bytecount(bytecount)) |
349 unit=_('bytes')) | 372 ) |
373 progress = repo.ui.makeprogress( | |
374 _('clone'), total=bytecount, unit=_('bytes') | |
375 ) | |
350 progress.update(0) | 376 progress.update(0) |
351 start = util.timer() | 377 start = util.timer() |
352 | 378 |
353 # TODO: get rid of (potential) inconsistency | 379 # TODO: get rid of (potential) inconsistency |
354 # | 380 # |
372 try: | 398 try: |
373 name, size = l.split('\0', 1) | 399 name, size = l.split('\0', 1) |
374 size = int(size) | 400 size = int(size) |
375 except (ValueError, TypeError): | 401 except (ValueError, TypeError): |
376 raise error.ResponseError( | 402 raise error.ResponseError( |
377 _('unexpected response from remote server:'), l) | 403 _('unexpected response from remote server:'), l |
404 ) | |
378 if repo.ui.debugflag: | 405 if repo.ui.debugflag: |
379 repo.ui.debug('adding %s (%s)\n' % | 406 repo.ui.debug( |
380 (name, util.bytecount(size))) | 407 'adding %s (%s)\n' % (name, util.bytecount(size)) |
408 ) | |
381 # for backwards compat, name was partially encoded | 409 # for backwards compat, name was partially encoded |
382 path = store.decodedir(name) | 410 path = store.decodedir(name) |
383 with repo.svfs(path, 'w', backgroundclose=True) as ofp: | 411 with repo.svfs(path, 'w', backgroundclose=True) as ofp: |
384 for chunk in util.filechunkiter(fp, limit=size): | 412 for chunk in util.filechunkiter(fp, limit=size): |
385 progress.increment(step=len(chunk)) | 413 progress.increment(step=len(chunk)) |
391 | 419 |
392 elapsed = util.timer() - start | 420 elapsed = util.timer() - start |
393 if elapsed <= 0: | 421 if elapsed <= 0: |
394 elapsed = 0.001 | 422 elapsed = 0.001 |
395 progress.complete() | 423 progress.complete() |
396 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | 424 repo.ui.status( |
397 (util.bytecount(bytecount), elapsed, | 425 _('transferred %s in %.1f seconds (%s/sec)\n') |
398 util.bytecount(bytecount / elapsed))) | 426 % ( |
427 util.bytecount(bytecount), | |
428 elapsed, | |
429 util.bytecount(bytecount / elapsed), | |
430 ) | |
431 ) | |
432 | |
399 | 433 |
400 def readbundle1header(fp): | 434 def readbundle1header(fp): |
401 compression = fp.read(2) | 435 compression = fp.read(2) |
402 if compression != 'UN': | 436 if compression != 'UN': |
403 raise error.Abort(_('only uncompressed stream clone bundles are ' | 437 raise error.Abort( |
404 'supported; got %s') % compression) | 438 _('only uncompressed stream clone bundles are ' 'supported; got %s') |
439 % compression | |
440 ) | |
405 | 441 |
406 filecount, bytecount = struct.unpack('>QQ', fp.read(16)) | 442 filecount, bytecount = struct.unpack('>QQ', fp.read(16)) |
407 requireslen = struct.unpack('>H', fp.read(2))[0] | 443 requireslen = struct.unpack('>H', fp.read(2))[0] |
408 requires = fp.read(requireslen) | 444 requires = fp.read(requireslen) |
409 | 445 |
410 if not requires.endswith('\0'): | 446 if not requires.endswith('\0'): |
411 raise error.Abort(_('malformed stream clone bundle: ' | 447 raise error.Abort( |
412 'requirements not properly encoded')) | 448 _( |
449 'malformed stream clone bundle: ' | |
450 'requirements not properly encoded' | |
451 ) | |
452 ) | |
413 | 453 |
414 requirements = set(requires.rstrip('\0').split(',')) | 454 requirements = set(requires.rstrip('\0').split(',')) |
415 | 455 |
416 return filecount, bytecount, requirements | 456 return filecount, bytecount, requirements |
457 | |
417 | 458 |
418 def applybundlev1(repo, fp): | 459 def applybundlev1(repo, fp): |
419 """Apply the content from a stream clone bundle version 1. | 460 """Apply the content from a stream clone bundle version 1. |
420 | 461 |
421 We assume the 4 byte header has been read and validated and the file handle | 462 We assume the 4 byte header has been read and validated and the file handle |
422 is at the 2 byte compression identifier. | 463 is at the 2 byte compression identifier. |
423 """ | 464 """ |
424 if len(repo): | 465 if len(repo): |
425 raise error.Abort(_('cannot apply stream clone bundle on non-empty ' | 466 raise error.Abort( |
426 'repo')) | 467 _('cannot apply stream clone bundle on non-empty ' 'repo') |
468 ) | |
427 | 469 |
428 filecount, bytecount, requirements = readbundle1header(fp) | 470 filecount, bytecount, requirements = readbundle1header(fp) |
429 missingreqs = requirements - repo.supportedformats | 471 missingreqs = requirements - repo.supportedformats |
430 if missingreqs: | 472 if missingreqs: |
431 raise error.Abort(_('unable to apply stream clone: ' | 473 raise error.Abort( |
432 'unsupported format: %s') % | 474 _('unable to apply stream clone: ' 'unsupported format: %s') |
433 ', '.join(sorted(missingreqs))) | 475 % ', '.join(sorted(missingreqs)) |
476 ) | |
434 | 477 |
435 consumev1(repo, fp, filecount, bytecount) | 478 consumev1(repo, fp, filecount, bytecount) |
479 | |
436 | 480 |
437 class streamcloneapplier(object): | 481 class streamcloneapplier(object): |
438 """Class to manage applying streaming clone bundles. | 482 """Class to manage applying streaming clone bundles. |
439 | 483 |
440 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle | 484 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle |
441 readers to perform bundle type-specific functionality. | 485 readers to perform bundle type-specific functionality. |
442 """ | 486 """ |
487 | |
443 def __init__(self, fh): | 488 def __init__(self, fh): |
444 self._fh = fh | 489 self._fh = fh |
445 | 490 |
446 def apply(self, repo): | 491 def apply(self, repo): |
447 return applybundlev1(repo, self._fh) | 492 return applybundlev1(repo, self._fh) |
448 | 493 |
494 | |
449 # type of file to stream | 495 # type of file to stream |
450 _fileappend = 0 # append only file | 496 _fileappend = 0 # append only file |
451 _filefull = 1 # full snapshot file | 497 _filefull = 1 # full snapshot file |
452 | 498 |
453 # Source of the file | 499 # Source of the file |
454 _srcstore = 's' # store (svfs) | 500 _srcstore = 's' # store (svfs) |
455 _srccache = 'c' # cache (cache) | 501 _srccache = 'c' # cache (cache) |
456 | 502 |
457 # This is it's own function so extensions can override it. | 503 # This is it's own function so extensions can override it. |
458 def _walkstreamfullstorefiles(repo): | 504 def _walkstreamfullstorefiles(repo): |
459 """list snapshot file from the store""" | 505 """list snapshot file from the store""" |
460 fnames = [] | 506 fnames = [] |
461 if not repo.publishing(): | 507 if not repo.publishing(): |
462 fnames.append('phaseroots') | 508 fnames.append('phaseroots') |
463 return fnames | 509 return fnames |
464 | 510 |
511 | |
465 def _filterfull(entry, copy, vfsmap): | 512 def _filterfull(entry, copy, vfsmap): |
466 """actually copy the snapshot files""" | 513 """actually copy the snapshot files""" |
467 src, name, ftype, data = entry | 514 src, name, ftype, data = entry |
468 if ftype != _filefull: | 515 if ftype != _filefull: |
469 return entry | 516 return entry |
470 return (src, name, ftype, copy(vfsmap[src].join(name))) | 517 return (src, name, ftype, copy(vfsmap[src].join(name))) |
471 | 518 |
519 | |
472 @contextlib.contextmanager | 520 @contextlib.contextmanager |
473 def maketempcopies(): | 521 def maketempcopies(): |
474 """return a function to temporary copy file""" | 522 """return a function to temporary copy file""" |
475 files = [] | 523 files = [] |
476 try: | 524 try: |
525 | |
477 def copy(src): | 526 def copy(src): |
478 fd, dst = pycompat.mkstemp() | 527 fd, dst = pycompat.mkstemp() |
479 os.close(fd) | 528 os.close(fd) |
480 files.append(dst) | 529 files.append(dst) |
481 util.copyfiles(src, dst, hardlink=True) | 530 util.copyfiles(src, dst, hardlink=True) |
482 return dst | 531 return dst |
532 | |
483 yield copy | 533 yield copy |
484 finally: | 534 finally: |
485 for tmp in files: | 535 for tmp in files: |
486 util.tryunlink(tmp) | 536 util.tryunlink(tmp) |
537 | |
487 | 538 |
488 def _makemap(repo): | 539 def _makemap(repo): |
489 """make a (src -> vfs) map for the repo""" | 540 """make a (src -> vfs) map for the repo""" |
490 vfsmap = { | 541 vfsmap = { |
491 _srcstore: repo.svfs, | 542 _srcstore: repo.svfs, |
495 # (eg: .hg/hgrc) | 546 # (eg: .hg/hgrc) |
496 assert repo.vfs not in vfsmap.values() | 547 assert repo.vfs not in vfsmap.values() |
497 | 548 |
498 return vfsmap | 549 return vfsmap |
499 | 550 |
551 | |
500 def _emit2(repo, entries, totalfilesize): | 552 def _emit2(repo, entries, totalfilesize): |
501 """actually emit the stream bundle""" | 553 """actually emit the stream bundle""" |
502 vfsmap = _makemap(repo) | 554 vfsmap = _makemap(repo) |
503 progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize, | 555 progress = repo.ui.makeprogress( |
504 unit=_('bytes')) | 556 _('bundle'), total=totalfilesize, unit=_('bytes') |
557 ) | |
505 progress.update(0) | 558 progress.update(0) |
506 with maketempcopies() as copy, progress: | 559 with maketempcopies() as copy, progress: |
507 # copy is delayed until we are in the try | 560 # copy is delayed until we are in the try |
508 entries = [_filterfull(e, copy, vfsmap) for e in entries] | 561 entries = [_filterfull(e, copy, vfsmap) for e in entries] |
509 yield None # this release the lock on the repository | 562 yield None # this release the lock on the repository |
510 seen = 0 | 563 seen = 0 |
511 | 564 |
512 for src, name, ftype, data in entries: | 565 for src, name, ftype, data in entries: |
513 vfs = vfsmap[src] | 566 vfs = vfsmap[src] |
514 yield src | 567 yield src |
531 progress.update(seen) | 584 progress.update(seen) |
532 yield chunk | 585 yield chunk |
533 finally: | 586 finally: |
534 fp.close() | 587 fp.close() |
535 | 588 |
589 | |
536 def generatev2(repo, includes, excludes, includeobsmarkers): | 590 def generatev2(repo, includes, excludes, includeobsmarkers): |
537 """Emit content for version 2 of a streaming clone. | 591 """Emit content for version 2 of a streaming clone. |
538 | 592 |
539 the data stream consists the following entries: | 593 the data stream consists the following entries: |
540 1) A char representing the file destination (eg: store or cache) | 594 1) A char representing the file destination (eg: store or cache) |
576 first = next(chunks) | 630 first = next(chunks) |
577 assert first is None | 631 assert first is None |
578 | 632 |
579 return len(entries), totalfilesize, chunks | 633 return len(entries), totalfilesize, chunks |
580 | 634 |
635 | |
581 @contextlib.contextmanager | 636 @contextlib.contextmanager |
582 def nested(*ctxs): | 637 def nested(*ctxs): |
583 this = ctxs[0] | 638 this = ctxs[0] |
584 rest = ctxs[1:] | 639 rest = ctxs[1:] |
585 with this: | 640 with this: |
587 with nested(*rest): | 642 with nested(*rest): |
588 yield | 643 yield |
589 else: | 644 else: |
590 yield | 645 yield |
591 | 646 |
647 | |
592 def consumev2(repo, fp, filecount, filesize): | 648 def consumev2(repo, fp, filecount, filesize): |
593 """Apply the contents from a version 2 streaming clone. | 649 """Apply the contents from a version 2 streaming clone. |
594 | 650 |
595 Data is read from an object that only needs to provide a ``read(size)`` | 651 Data is read from an object that only needs to provide a ``read(size)`` |
596 method. | 652 method. |
597 """ | 653 """ |
598 with repo.lock(): | 654 with repo.lock(): |
599 repo.ui.status(_('%d files to transfer, %s of data\n') % | 655 repo.ui.status( |
600 (filecount, util.bytecount(filesize))) | 656 _('%d files to transfer, %s of data\n') |
657 % (filecount, util.bytecount(filesize)) | |
658 ) | |
601 | 659 |
602 start = util.timer() | 660 start = util.timer() |
603 progress = repo.ui.makeprogress(_('clone'), total=filesize, | 661 progress = repo.ui.makeprogress( |
604 unit=_('bytes')) | 662 _('clone'), total=filesize, unit=_('bytes') |
663 ) | |
605 progress.update(0) | 664 progress.update(0) |
606 | 665 |
607 vfsmap = _makemap(repo) | 666 vfsmap = _makemap(repo) |
608 | 667 |
609 with repo.transaction('clone'): | 668 with repo.transaction('clone'): |
610 ctxs = (vfs.backgroundclosing(repo.ui) | 669 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) |
611 for vfs in vfsmap.values()) | |
612 with nested(*ctxs): | 670 with nested(*ctxs): |
613 for i in range(filecount): | 671 for i in range(filecount): |
614 src = util.readexactly(fp, 1) | 672 src = util.readexactly(fp, 1) |
615 vfs = vfsmap[src] | 673 vfs = vfsmap[src] |
616 namelen = util.uvarintdecodestream(fp) | 674 namelen = util.uvarintdecodestream(fp) |
617 datalen = util.uvarintdecodestream(fp) | 675 datalen = util.uvarintdecodestream(fp) |
618 | 676 |
619 name = util.readexactly(fp, namelen) | 677 name = util.readexactly(fp, namelen) |
620 | 678 |
621 if repo.ui.debugflag: | 679 if repo.ui.debugflag: |
622 repo.ui.debug('adding [%s] %s (%s)\n' % | 680 repo.ui.debug( |
623 (src, name, util.bytecount(datalen))) | 681 'adding [%s] %s (%s)\n' |
682 % (src, name, util.bytecount(datalen)) | |
683 ) | |
624 | 684 |
625 with vfs(name, 'w') as ofp: | 685 with vfs(name, 'w') as ofp: |
626 for chunk in util.filechunkiter(fp, limit=datalen): | 686 for chunk in util.filechunkiter(fp, limit=datalen): |
627 progress.increment(step=len(chunk)) | 687 progress.increment(step=len(chunk)) |
628 ofp.write(chunk) | 688 ofp.write(chunk) |
632 repo.invalidate(clearfilecache=True) | 692 repo.invalidate(clearfilecache=True) |
633 | 693 |
634 elapsed = util.timer() - start | 694 elapsed = util.timer() - start |
635 if elapsed <= 0: | 695 if elapsed <= 0: |
636 elapsed = 0.001 | 696 elapsed = 0.001 |
637 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | 697 repo.ui.status( |
638 (util.bytecount(progress.pos), elapsed, | 698 _('transferred %s in %.1f seconds (%s/sec)\n') |
639 util.bytecount(progress.pos / elapsed))) | 699 % ( |
700 util.bytecount(progress.pos), | |
701 elapsed, | |
702 util.bytecount(progress.pos / elapsed), | |
703 ) | |
704 ) | |
640 progress.complete() | 705 progress.complete() |
706 | |
641 | 707 |
642 def applybundlev2(repo, fp, filecount, filesize, requirements): | 708 def applybundlev2(repo, fp, filecount, filesize, requirements): |
643 from . import localrepo | 709 from . import localrepo |
644 | 710 |
645 missingreqs = [r for r in requirements if r not in repo.supported] | 711 missingreqs = [r for r in requirements if r not in repo.supported] |
646 if missingreqs: | 712 if missingreqs: |
647 raise error.Abort(_('unable to apply stream clone: ' | 713 raise error.Abort( |
648 'unsupported format: %s') % | 714 _('unable to apply stream clone: ' 'unsupported format: %s') |
649 ', '.join(sorted(missingreqs))) | 715 % ', '.join(sorted(missingreqs)) |
716 ) | |
650 | 717 |
651 consumev2(repo, fp, filecount, filesize) | 718 consumev2(repo, fp, filecount, filesize) |
652 | 719 |
653 # new requirements = old non-format requirements + | 720 # new requirements = old non-format requirements + |
654 # new format-related remote requirements | 721 # new format-related remote requirements |
655 # requirements from the streamed-in repository | 722 # requirements from the streamed-in repository |
656 repo.requirements = set(requirements) | ( | 723 repo.requirements = set(requirements) | ( |
657 repo.requirements - repo.supportedformats) | 724 repo.requirements - repo.supportedformats |
725 ) | |
658 repo.svfs.options = localrepo.resolvestorevfsoptions( | 726 repo.svfs.options = localrepo.resolvestorevfsoptions( |
659 repo.ui, repo.requirements, repo.features) | 727 repo.ui, repo.requirements, repo.features |
728 ) | |
660 repo._writerequirements() | 729 repo._writerequirements() |