54 |
55 |
55 # Write bookmark updates. |
56 # Write bookmark updates. |
56 bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'], |
57 bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'], |
57 remote.url(), pullop.gettransaction, |
58 remote.url(), pullop.gettransaction, |
58 explicit=pullop.explicitbookmarks) |
59 explicit=pullop.explicitbookmarks) |
|
60 |
|
61 _fetchmanifests(repo, tr, remote, csetres['manifestnodes']) |
59 |
62 |
60 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): |
63 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): |
61 """Determine which changesets need to be pulled.""" |
64 """Determine which changesets need to be pulled.""" |
62 |
65 |
63 if heads: |
66 if heads: |
119 |
122 |
120 progress = repo.ui.makeprogress(_('changesets'), |
123 progress = repo.ui.makeprogress(_('changesets'), |
121 unit=_('chunks'), |
124 unit=_('chunks'), |
122 total=meta.get(b'totalitems')) |
125 total=meta.get(b'totalitems')) |
123 |
126 |
|
127 manifestnodes = {} |
|
128 |
124 def linkrev(node): |
129 def linkrev(node): |
125 repo.ui.debug('add changeset %s\n' % short(node)) |
130 repo.ui.debug('add changeset %s\n' % short(node)) |
126 # Linkrev for changelog is always self. |
131 # Linkrev for changelog is always self. |
127 return len(cl) |
132 return len(cl) |
128 |
133 |
129 def onchangeset(cl, node): |
134 def onchangeset(cl, node): |
130 progress.increment() |
135 progress.increment() |
|
136 |
|
137 revision = cl.changelogrevision(node) |
|
138 |
|
139 # We need to preserve the mapping of changelog revision to node |
|
140 # so we can set the linkrev accordingly when manifests are added. |
|
141 manifestnodes[cl.rev(node)] = revision.manifest |
131 |
142 |
132 nodesbyphase = {phase: set() for phase in phases.phasenames} |
143 nodesbyphase = {phase: set() for phase in phases.phasenames} |
133 remotebookmarks = {} |
144 remotebookmarks = {} |
134 |
145 |
135 # addgroup() expects a 7-tuple describing revisions. This normalizes |
146 # addgroup() expects a 7-tuple describing revisions. This normalizes |
176 |
187 |
177 return { |
188 return { |
178 'added': added, |
189 'added': added, |
179 'nodesbyphase': nodesbyphase, |
190 'nodesbyphase': nodesbyphase, |
180 'bookmarks': remotebookmarks, |
191 'bookmarks': remotebookmarks, |
|
192 'manifestnodes': manifestnodes, |
181 } |
193 } |
|
194 |
|
195 def _fetchmanifests(repo, tr, remote, manifestnodes): |
|
196 rootmanifest = repo.manifestlog.getstorage(b'') |
|
197 |
|
198 # Some manifests can be shared between changesets. Filter out revisions |
|
199 # we already know about. |
|
200 fetchnodes = [] |
|
201 linkrevs = {} |
|
202 seen = set() |
|
203 |
|
204 for clrev, node in sorted(manifestnodes.iteritems()): |
|
205 if node in seen: |
|
206 continue |
|
207 |
|
208 try: |
|
209 rootmanifest.rev(node) |
|
210 except error.LookupError: |
|
211 fetchnodes.append(node) |
|
212 linkrevs[node] = clrev |
|
213 |
|
214 seen.add(node) |
|
215 |
|
216 # TODO handle tree manifests |
|
217 |
|
218 # addgroup() expects 7-tuple describing revisions. This normalizes |
|
219 # the wire data to that format. |
|
220 def iterrevisions(objs, progress): |
|
221 for manifest in objs: |
|
222 node = manifest[b'node'] |
|
223 |
|
224 if b'deltasize' in manifest: |
|
225 basenode = manifest[b'deltabasenode'] |
|
226 delta = next(objs) |
|
227 elif b'revisionsize' in manifest: |
|
228 basenode = nullid |
|
229 revision = next(objs) |
|
230 delta = mdiff.trivialdiffheader(len(revision)) + revision |
|
231 else: |
|
232 continue |
|
233 |
|
234 yield ( |
|
235 node, |
|
236 manifest[b'parents'][0], |
|
237 manifest[b'parents'][1], |
|
238 # The value passed in is passed to the lookup function passed |
|
239 # to addgroup(). We already have a map of manifest node to |
|
240 # changelog revision number. So we just pass in the |
|
241 # manifest node here and use linkrevs.__getitem__ as the |
|
242 # resolution function. |
|
243 node, |
|
244 basenode, |
|
245 delta, |
|
246 # Flags not yet supported. |
|
247 0 |
|
248 ) |
|
249 |
|
250 progress.increment() |
|
251 |
|
252 progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'), |
|
253 total=len(fetchnodes)) |
|
254 |
|
255 # Fetch manifests 10,000 per command. |
|
256 # TODO have server advertise preferences? |
|
257 # TODO make size configurable on client? |
|
258 batchsize = 10000 |
|
259 |
|
260 # We send commands 1 at a time to the remote. This is not the most |
|
261 # efficient because we incur a round trip at the end of each batch. |
|
262 # However, the existing frame-based reactor keeps consuming server |
|
263 # data in the background. And this results in response data buffering |
|
264 # in memory. This can consume gigabytes of memory. |
|
265 # TODO send multiple commands in a request once background buffering |
|
266 # issues are resolved. |
|
267 |
|
268 added = [] |
|
269 |
|
270 for i in pycompat.xrange(0, len(fetchnodes), batchsize): |
|
271 batch = [node for node in fetchnodes[i:i + batchsize]] |
|
272 if not batch: |
|
273 continue |
|
274 |
|
275 with remote.commandexecutor() as e: |
|
276 objs = e.callcommand(b'manifestdata', { |
|
277 b'tree': b'', |
|
278 b'nodes': batch, |
|
279 b'fields': {b'parents', b'revision'}, |
|
280 }).result() |
|
281 |
|
282 # Chomp off header object. |
|
283 next(objs) |
|
284 |
|
285 added.extend(rootmanifest.addgroup( |
|
286 iterrevisions(objs, progress), |
|
287 linkrevs.__getitem__, |
|
288 weakref.proxy(tr))) |
|
289 |
|
290 progress.complete() |
|
291 |
|
292 return { |
|
293 'added': added, |
|
294 } |