Mercurial > hg
view mercurial/exchangev2.py @ 45095:8e04607023e5
procutil: ensure that procutil.std{out,err}.write() writes all bytes
Python 3 offers different kind of streams and it’s not guaranteed for all of
them that calling write() writes all bytes.
When Python is started in unbuffered mode, sys.std{out,err}.buffer are
instances of io.FileIO, whose write() can write less bytes for
platform-specific reasons (e.g. Linux has a 0x7ffff000 bytes maximum and could
write less if interrupted by a signal; when writing to Windows consoles, it’s
limited to 32767 bytes to avoid the "not enough space" error). This can lead to
silent loss of data, both when using sys.std{out,err}.buffer (which may in fact
not be a buffered stream) and when using the text streams sys.std{out,err}
(I’ve created a CPython bug report for that:
https://bugs.python.org/issue41221).
Python may fix the problem at some point. For now, we implement our own wrapper
for procutil.std{out,err} that calls the raw stream’s write() method until all
bytes have been written. We don’t use sys.std{out,err} for larger writes, so I
think it’s not worth the effort to patch them.
author | Manuel Jacob <me@manueljacob.de> |
---|---|
date | Fri, 10 Jul 2020 12:27:58 +0200 |
parents | a166fadf5c3b |
children | b1e51ef4e536 |
line wrap: on
line source
# exchangev2.py - repository exchange for wire protocol version 2 # # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import import collections import weakref from .i18n import _ from .node import ( nullid, short, ) from . import ( bookmarks, error, mdiff, narrowspec, phases, pycompat, setdiscovery, ) from .interfaces import repository def pull(pullop): """Pull using wire protocol version 2.""" repo = pullop.repo remote = pullop.remote usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop) # If this is a clone and it was requested to perform a "stream clone", # we obtain the raw files data from the remote then fall back to an # incremental pull. This is somewhat hacky and is not nearly robust enough # for long-term usage. if usingrawchangelogandmanifest: with repo.transaction(b'clone'): _fetchrawstorefiles(repo, remote) repo.invalidate(clearfilecache=True) tr = pullop.trmanager.transaction() # We don't use the repo's narrow matcher here because the patterns passed # to exchange.pull() could be different. narrowmatcher = narrowspec.match( repo.root, # Empty maps to nevermatcher. So always # set includes if missing. pullop.includepats or {b'path:.'}, pullop.excludepats, ) if pullop.includepats or pullop.excludepats: pathfilter = {} if pullop.includepats: pathfilter[b'include'] = sorted(pullop.includepats) if pullop.excludepats: pathfilter[b'exclude'] = sorted(pullop.excludepats) else: pathfilter = None # Figure out what needs to be fetched. common, fetch, remoteheads = _pullchangesetdiscovery( repo, remote, pullop.heads, abortwhenunrelated=pullop.force ) # And fetch the data. pullheads = pullop.heads or remoteheads csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads) # New revisions are written to the changelog. But all other updates # are deferred. Do those now. # Ensure all new changesets are draft by default. If the repo is # publishing, the phase will be adjusted by the loop below. if csetres[b'added']: phases.registernew(repo, tr, phases.draft, csetres[b'added']) # And adjust the phase of all changesets accordingly. for phase in phases.phasenames: if phase == b'secret' or not csetres[b'nodesbyphase'][phase]: continue phases.advanceboundary( repo, tr, phases.phasenames.index(phase), csetres[b'nodesbyphase'][phase], ) # Write bookmark updates. bookmarks.updatefromremote( repo.ui, repo, csetres[b'bookmarks'], remote.url(), pullop.gettransaction, explicit=pullop.explicitbookmarks, ) manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes']) # We don't properly support shallow changeset and manifest yet. So we apply # depth limiting locally. if pullop.depth: relevantcsetnodes = set() clnode = repo.changelog.node for rev in repo.revs( b'ancestors(%ln, %s)', pullheads, pullop.depth - 1 ): relevantcsetnodes.add(clnode(rev)) csetrelevantfilter = lambda n: n in relevantcsetnodes else: csetrelevantfilter = lambda n: True # If obtaining the raw store files, we need to scan the full repo to # derive all the changesets, manifests, and linkrevs. if usingrawchangelogandmanifest: csetsforfiles = [] mnodesforfiles = [] manifestlinkrevs = {} for rev in repo: ctx = repo[rev] node = ctx.node() if not csetrelevantfilter(node): continue mnode = ctx.manifestnode() csetsforfiles.append(node) mnodesforfiles.append(mnode) manifestlinkrevs[mnode] = rev else: csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)] mnodesforfiles = manres[b'added'] manifestlinkrevs = manres[b'linkrevs'] # Find all file nodes referenced by added manifests and fetch those # revisions. fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles) _fetchfilesfromcsets( repo, tr, remote, pathfilter, fnodes, csetsforfiles, manifestlinkrevs, shallow=bool(pullop.depth), ) def _checkuserawstorefiledata(pullop): """Check whether we should use rawstorefiledata command to retrieve data.""" repo = pullop.repo remote = pullop.remote # Command to obtain raw store data isn't available. if b'rawstorefiledata' not in remote.apidescriptor[b'commands']: return False # Only honor if user requested stream clone operation. if not pullop.streamclonerequested: return False # Only works on empty repos. if len(repo): return False # TODO This is super hacky. There needs to be a storage API for this. We # also need to check for compatibility with the remote. if b'revlogv1' not in repo.requirements: return False return True def _fetchrawstorefiles(repo, remote): with remote.commandexecutor() as e: objs = e.callcommand( b'rawstorefiledata', {b'files': [b'changelog', b'manifestlog'],} ).result() # First object is a summary of files data that follows. overall = next(objs) progress = repo.ui.makeprogress( _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes') ) with progress: progress.update(0) # Next are pairs of file metadata, data. while True: try: filemeta = next(objs) except StopIteration: break for k in (b'location', b'path', b'size'): if k not in filemeta: raise error.Abort( _(b'remote file data missing key: %s') % k ) if filemeta[b'location'] == b'store': vfs = repo.svfs else: raise error.Abort( _(b'invalid location for raw file data: %s') % filemeta[b'location'] ) bytesremaining = filemeta[b'size'] with vfs.open(filemeta[b'path'], b'wb') as fh: while True: try: chunk = next(objs) except StopIteration: break bytesremaining -= len(chunk) if bytesremaining < 0: raise error.Abort( _( b'received invalid number of bytes for file ' b'data; expected %d, got extra' ) % filemeta[b'size'] ) progress.increment(step=len(chunk)) fh.write(chunk) try: if chunk.islast: break except AttributeError: raise error.Abort( _( b'did not receive indefinite length bytestring ' b'for file data' ) ) if bytesremaining: raise error.Abort( _( b'received invalid number of bytes for' b'file data; expected %d got %d' ) % ( filemeta[b'size'], filemeta[b'size'] - bytesremaining, ) ) def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): """Determine which changesets need to be pulled.""" if heads: knownnode = repo.changelog.hasnode if all(knownnode(head) for head in heads): return heads, False, heads # TODO wire protocol version 2 is capable of more efficient discovery # than setdiscovery. Consider implementing something better. common, fetch, remoteheads = setdiscovery.findcommonheads( repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated ) common = set(common) remoteheads = set(remoteheads) # If a remote head is filtered locally, put it back in the common set. # See the comment in exchange._pulldiscoverychangegroup() for more. if fetch and remoteheads: has_node = repo.unfiltered().changelog.index.has_node common |= {head for head in remoteheads if has_node(head)} if set(remoteheads).issubset(common): fetch = [] common.discard(nullid) return common, fetch, remoteheads def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads): # TODO consider adding a step here where we obtain the DAG shape first # (or ask the server to slice changesets into chunks for us) so that # we can perform multiple fetches in batches. This will facilitate # resuming interrupted clones, higher server-side cache hit rates due # to smaller segments, etc. with remote.commandexecutor() as e: objs = e.callcommand( b'changesetdata', { b'revisions': [ { b'type': b'changesetdagrange', b'roots': sorted(common), b'heads': sorted(remoteheads), } ], b'fields': {b'bookmarks', b'parents', b'phase', b'revision'}, }, ).result() # The context manager waits on all response data when exiting. So # we need to remain in the context manager in order to stream data. return _processchangesetdata(repo, tr, objs) def _processchangesetdata(repo, tr, objs): repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs)) urepo = repo.unfiltered() cl = urepo.changelog cl.delayupdate(tr) # The first emitted object is a header describing the data that # follows. meta = next(objs) progress = repo.ui.makeprogress( _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems') ) manifestnodes = {} def linkrev(node): repo.ui.debug(b'add changeset %s\n' % short(node)) # Linkrev for changelog is always self. return len(cl) def onchangeset(cl, node): progress.increment() revision = cl.changelogrevision(node) # We need to preserve the mapping of changelog revision to node # so we can set the linkrev accordingly when manifests are added. manifestnodes[cl.rev(node)] = revision.manifest nodesbyphase = {phase: set() for phase in phases.phasenames} remotebookmarks = {} # addgroup() expects a 7-tuple describing revisions. This normalizes # the wire data to that format. # # This loop also aggregates non-revision metadata, such as phase # data. def iterrevisions(): for cset in objs: node = cset[b'node'] if b'phase' in cset: nodesbyphase[cset[b'phase']].add(node) for mark in cset.get(b'bookmarks', []): remotebookmarks[mark] = node # TODO add mechanism for extensions to examine records so they # can siphon off custom data fields. extrafields = {} for field, size in cset.get(b'fieldsfollowing', []): extrafields[field] = next(objs) # Some entries might only be metadata only updates. if b'revision' not in extrafields: continue data = extrafields[b'revision'] yield ( node, cset[b'parents'][0], cset[b'parents'][1], # Linknode is always itself for changesets. cset[b'node'], # We always send full revisions. So delta base is not set. nullid, mdiff.trivialdiffheader(len(data)) + data, # Flags not yet supported. 0, ) added = cl.addgroup( iterrevisions(), linkrev, weakref.proxy(tr), addrevisioncb=onchangeset ) progress.complete() return { b'added': added, b'nodesbyphase': nodesbyphase, b'bookmarks': remotebookmarks, b'manifestnodes': manifestnodes, } def _fetchmanifests(repo, tr, remote, manifestnodes): rootmanifest = repo.manifestlog.getstorage(b'') # Some manifests can be shared between changesets. Filter out revisions # we already know about. fetchnodes = [] linkrevs = {} seen = set() for clrev, node in sorted(pycompat.iteritems(manifestnodes)): if node in seen: continue try: rootmanifest.rev(node) except error.LookupError: fetchnodes.append(node) linkrevs[node] = clrev seen.add(node) # TODO handle tree manifests # addgroup() expects 7-tuple describing revisions. This normalizes # the wire data to that format. def iterrevisions(objs, progress): for manifest in objs: node = manifest[b'node'] extrafields = {} for field, size in manifest.get(b'fieldsfollowing', []): extrafields[field] = next(objs) if b'delta' in extrafields: basenode = manifest[b'deltabasenode'] delta = extrafields[b'delta'] elif b'revision' in extrafields: basenode = nullid revision = extrafields[b'revision'] delta = mdiff.trivialdiffheader(len(revision)) + revision else: continue yield ( node, manifest[b'parents'][0], manifest[b'parents'][1], # The value passed in is passed to the lookup function passed # to addgroup(). We already have a map of manifest node to # changelog revision number. So we just pass in the # manifest node here and use linkrevs.__getitem__ as the # resolution function. node, basenode, delta, # Flags not yet supported. 0, ) progress.increment() progress = repo.ui.makeprogress( _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes) ) commandmeta = remote.apidescriptor[b'commands'][b'manifestdata'] batchsize = commandmeta.get(b'recommendedbatchsize', 10000) # TODO make size configurable on client? # We send commands 1 at a time to the remote. This is not the most # efficient because we incur a round trip at the end of each batch. # However, the existing frame-based reactor keeps consuming server # data in the background. And this results in response data buffering # in memory. This can consume gigabytes of memory. # TODO send multiple commands in a request once background buffering # issues are resolved. added = [] for i in pycompat.xrange(0, len(fetchnodes), batchsize): batch = [node for node in fetchnodes[i : i + batchsize]] if not batch: continue with remote.commandexecutor() as e: objs = e.callcommand( b'manifestdata', { b'tree': b'', b'nodes': batch, b'fields': {b'parents', b'revision'}, b'haveparents': True, }, ).result() # Chomp off header object. next(objs) added.extend( rootmanifest.addgroup( iterrevisions(objs, progress), linkrevs.__getitem__, weakref.proxy(tr), ) ) progress.complete() return { b'added': added, b'linkrevs': linkrevs, } def _derivefilesfrommanifests(repo, matcher, manifestnodes): """Determine what file nodes are relevant given a set of manifest nodes. Returns a dict mapping file paths to dicts of file node to first manifest node. """ ml = repo.manifestlog fnodes = collections.defaultdict(dict) progress = repo.ui.makeprogress( _(b'scanning manifests'), total=len(manifestnodes) ) with progress: for manifestnode in manifestnodes: m = ml.get(b'', manifestnode) # TODO this will pull in unwanted nodes because it takes the storage # delta into consideration. What we really want is something that # takes the delta between the manifest's parents. And ideally we # would ignore file nodes that are known locally. For now, ignore # both these limitations. This will result in incremental fetches # requesting data we already have. So this is far from ideal. md = m.readfast() for path, fnode in md.items(): if matcher(path): fnodes[path].setdefault(fnode, manifestnode) progress.increment() return fnodes def _fetchfiles(repo, tr, remote, fnodes, linkrevs): """Fetch file data from explicit file revisions.""" def iterrevisions(objs, progress): for filerevision in objs: node = filerevision[b'node'] extrafields = {} for field, size in filerevision.get(b'fieldsfollowing', []): extrafields[field] = next(objs) if b'delta' in extrafields: basenode = filerevision[b'deltabasenode'] delta = extrafields[b'delta'] elif b'revision' in extrafields: basenode = nullid revision = extrafields[b'revision'] delta = mdiff.trivialdiffheader(len(revision)) + revision else: continue yield ( node, filerevision[b'parents'][0], filerevision[b'parents'][1], node, basenode, delta, # Flags not yet supported. 0, ) progress.increment() progress = repo.ui.makeprogress( _(b'files'), unit=_(b'chunks'), total=sum(len(v) for v in pycompat.itervalues(fnodes)), ) # TODO make batch size configurable batchsize = 10000 fnodeslist = [x for x in sorted(fnodes.items())] for i in pycompat.xrange(0, len(fnodeslist), batchsize): batch = [x for x in fnodeslist[i : i + batchsize]] if not batch: continue with remote.commandexecutor() as e: fs = [] locallinkrevs = {} for path, nodes in batch: fs.append( ( path, e.callcommand( b'filedata', { b'path': path, b'nodes': sorted(nodes), b'fields': {b'parents', b'revision'}, b'haveparents': True, }, ), ) ) locallinkrevs[path] = { node: linkrevs[manifestnode] for node, manifestnode in pycompat.iteritems(nodes) } for path, f in fs: objs = f.result() # Chomp off header objects. next(objs) store = repo.file(path) store.addgroup( iterrevisions(objs, progress), locallinkrevs[path].__getitem__, weakref.proxy(tr), ) def _fetchfilesfromcsets( repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False ): """Fetch file data from explicit changeset revisions.""" def iterrevisions(objs, remaining, progress): while remaining: filerevision = next(objs) node = filerevision[b'node'] extrafields = {} for field, size in filerevision.get(b'fieldsfollowing', []): extrafields[field] = next(objs) if b'delta' in extrafields: basenode = filerevision[b'deltabasenode'] delta = extrafields[b'delta'] elif b'revision' in extrafields: basenode = nullid revision = extrafields[b'revision'] delta = mdiff.trivialdiffheader(len(revision)) + revision else: continue if b'linknode' in filerevision: linknode = filerevision[b'linknode'] else: linknode = node yield ( node, filerevision[b'parents'][0], filerevision[b'parents'][1], linknode, basenode, delta, # Flags not yet supported. 0, ) progress.increment() remaining -= 1 progress = repo.ui.makeprogress( _(b'files'), unit=_(b'chunks'), total=sum(len(v) for v in pycompat.itervalues(fnodes)), ) commandmeta = remote.apidescriptor[b'commands'][b'filesdata'] batchsize = commandmeta.get(b'recommendedbatchsize', 50000) shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features fields = {b'parents', b'revision'} clrev = repo.changelog.rev # There are no guarantees that we'll have ancestor revisions if # a) this repo has shallow file storage b) shallow data fetching is enabled. # Force remote to not delta against possibly unknown revisions when these # conditions hold. haveparents = not (shallowfiles or shallow) # Similarly, we may not have calculated linkrevs for all incoming file # revisions. Ask the remote to do work for us in this case. if not haveparents: fields.add(b'linknode') for i in pycompat.xrange(0, len(csets), batchsize): batch = [x for x in csets[i : i + batchsize]] if not batch: continue with remote.commandexecutor() as e: args = { b'revisions': [ {b'type': b'changesetexplicit', b'nodes': batch,} ], b'fields': fields, b'haveparents': haveparents, } if pathfilter: args[b'pathfilter'] = pathfilter objs = e.callcommand(b'filesdata', args).result() # First object is an overall header. overall = next(objs) # We have overall['totalpaths'] segments. for i in pycompat.xrange(overall[b'totalpaths']): header = next(objs) path = header[b'path'] store = repo.file(path) linkrevs = { fnode: manlinkrevs[mnode] for fnode, mnode in pycompat.iteritems(fnodes[path]) } def getlinkrev(node): if node in linkrevs: return linkrevs[node] else: return clrev(node) store.addgroup( iterrevisions(objs, header[b'totalitems'], progress), getlinkrev, weakref.proxy(tr), maybemissingparents=shallow, )