comparison mercurial/exchangev2.py @ 40330:00a4cd368e3f

exchangev2: support for calling rawstorefiledata to retrieve raw files This is somewhat hacky. For that I apologize. At the 4.8 Sprint, we decided we wanted to land support in wireprotov2 for doing a partial clone with changelog and manifestlog bootstrapped from a "stream clone" like primitive. This commit implements the client-side bits necessary to facilitate that. If the new server-side command for obtaining raw files data is available, we call it to get the raw files for the changelog and manifestlog. Then we fall through to an incremental pull. But when fetching files data, instead of using the list of a changesets and manifests that we fetched via the "changesetdata" command, we do a linear scan of the repo and resolve the changeset and manifest nodes along with the manifest linkrevs. Differential Revision: https://phab.mercurial-scm.org/D5135
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 17 Oct 2018 10:10:05 +0200
parents 55836a34f41b
children 229d23cdb203
comparison
equal deleted inserted replaced
40329:ed55a0077490 40330:00a4cd368e3f
27 27
28 def pull(pullop): 28 def pull(pullop):
29 """Pull using wire protocol version 2.""" 29 """Pull using wire protocol version 2."""
30 repo = pullop.repo 30 repo = pullop.repo
31 remote = pullop.remote 31 remote = pullop.remote
32
33 usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
34
35 # If this is a clone and it was requested to perform a "stream clone",
36 # we obtain the raw files data from the remote then fall back to an
37 # incremental pull. This is somewhat hacky and is not nearly robust enough
38 # for long-term usage.
39 if usingrawchangelogandmanifest:
40 with repo.transaction('clone'):
41 _fetchrawstorefiles(repo, remote)
42 repo.invalidate(clearfilecache=True)
43
32 tr = pullop.trmanager.transaction() 44 tr = pullop.trmanager.transaction()
33 45
34 # We don't use the repo's narrow matcher here because the patterns passed 46 # We don't use the repo's narrow matcher here because the patterns passed
35 # to exchange.pull() could be different. 47 # to exchange.pull() could be different.
36 narrowmatcher = narrowspec.match(repo.root, 48 narrowmatcher = narrowspec.match(repo.root,
77 remote.url(), pullop.gettransaction, 89 remote.url(), pullop.gettransaction,
78 explicit=pullop.explicitbookmarks) 90 explicit=pullop.explicitbookmarks)
79 91
80 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes']) 92 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
81 93
94 # If obtaining the raw store files, we need to scan the full repo to
95 # derive all the changesets, manifests, and linkrevs.
96 if usingrawchangelogandmanifest:
97 csetsforfiles = []
98 mnodesforfiles = []
99 manifestlinkrevs = {}
100
101 for rev in repo:
102 ctx = repo[rev]
103 mnode = ctx.manifestnode()
104
105 csetsforfiles.append(ctx.node())
106 mnodesforfiles.append(mnode)
107 manifestlinkrevs[mnode] = rev
108
109 else:
110 csetsforfiles = csetres['added']
111 mnodesforfiles = manres['added']
112 manifestlinkrevs = manres['linkrevs']
113
82 # Find all file nodes referenced by added manifests and fetch those 114 # Find all file nodes referenced by added manifests and fetch those
83 # revisions. 115 # revisions.
84 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, manres['added']) 116 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
85 _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetres['added'], 117 _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetsforfiles,
86 manres['linkrevs']) 118 manifestlinkrevs)
119
120 def _checkuserawstorefiledata(pullop):
121 """Check whether we should use rawstorefiledata command to retrieve data."""
122
123 repo = pullop.repo
124 remote = pullop.remote
125
126 # Command to obtain raw store data isn't available.
127 if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
128 return False
129
130 # Only honor if user requested stream clone operation.
131 if not pullop.streamclonerequested:
132 return False
133
134 # Only works on empty repos.
135 if len(repo):
136 return False
137
138 # TODO This is super hacky. There needs to be a storage API for this. We
139 # also need to check for compatibility with the remote.
140 if b'revlogv1' not in repo.requirements:
141 return False
142
143 return True
144
145 def _fetchrawstorefiles(repo, remote):
146 with remote.commandexecutor() as e:
147 objs = e.callcommand(b'rawstorefiledata', {
148 b'files': [b'changelog', b'manifestlog'],
149 }).result()
150
151 # First object is a summary of files data that follows.
152 overall = next(objs)
153
154 progress = repo.ui.makeprogress(_('clone'), total=overall[b'totalsize'],
155 unit=_('bytes'))
156 with progress:
157 progress.update(0)
158
159 # Next are pairs of file metadata, data.
160 while True:
161 try:
162 filemeta = next(objs)
163 except StopIteration:
164 break
165
166 for k in (b'location', b'path', b'size'):
167 if k not in filemeta:
168 raise error.Abort(_(b'remote file data missing key: %s')
169 % k)
170
171 if filemeta[b'location'] == b'store':
172 vfs = repo.svfs
173 else:
174 raise error.Abort(_(b'invalid location for raw file data: '
175 b'%s') % filemeta[b'location'])
176
177 bytesremaining = filemeta[b'size']
178
179 with vfs.open(filemeta[b'path'], b'wb') as fh:
180 while True:
181 try:
182 chunk = next(objs)
183 except StopIteration:
184 break
185
186 bytesremaining -= len(chunk)
187
188 if bytesremaining < 0:
189 raise error.Abort(_(
190 b'received invalid number of bytes for file '
191 b'data; expected %d, got extra') %
192 filemeta[b'size'])
193
194 progress.increment(step=len(chunk))
195 fh.write(chunk)
196
197 try:
198 if chunk.islast:
199 break
200 except AttributeError:
201 raise error.Abort(_(
202 b'did not receive indefinite length bytestring '
203 b'for file data'))
204
205 if bytesremaining:
206 raise error.Abort(_(b'received invalid number of bytes for'
207 b'file data; expected %d got %d') %
208 (filemeta[b'size'],
209 filemeta[b'size'] - bytesremaining))
87 210
88 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): 211 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
89 """Determine which changesets need to be pulled.""" 212 """Determine which changesets need to be pulled."""
90 213
91 if heads: 214 if heads: