Mercurial > hg
comparison mercurial/exchangev2.py @ 40330:00a4cd368e3f
exchangev2: support for calling rawstorefiledata to retrieve raw files
This is somewhat hacky. For that I apologize.
At the 4.8 Sprint, we decided we wanted to land support in wireprotov2 for doing
a partial clone with changelog and manifestlog bootstrapped from a "stream clone"
like primitive.
This commit implements the client-side bits necessary to facilitate that.
If the new server-side command for obtaining raw files data is available, we
call it to get the raw files for the changelog and manifestlog. Then we
fall through to an incremental pull. But when fetching files data, instead
of using the list of a changesets and manifests that we fetched via the
"changesetdata" command, we do a linear scan of the repo and resolve the
changeset and manifest nodes along with the manifest linkrevs.
Differential Revision: https://phab.mercurial-scm.org/D5135
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 17 Oct 2018 10:10:05 +0200 |
parents | 55836a34f41b |
children | 229d23cdb203 |
comparison
equal
deleted
inserted
replaced
40329:ed55a0077490 | 40330:00a4cd368e3f |
---|---|
27 | 27 |
28 def pull(pullop): | 28 def pull(pullop): |
29 """Pull using wire protocol version 2.""" | 29 """Pull using wire protocol version 2.""" |
30 repo = pullop.repo | 30 repo = pullop.repo |
31 remote = pullop.remote | 31 remote = pullop.remote |
32 | |
33 usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop) | |
34 | |
35 # If this is a clone and it was requested to perform a "stream clone", | |
36 # we obtain the raw files data from the remote then fall back to an | |
37 # incremental pull. This is somewhat hacky and is not nearly robust enough | |
38 # for long-term usage. | |
39 if usingrawchangelogandmanifest: | |
40 with repo.transaction('clone'): | |
41 _fetchrawstorefiles(repo, remote) | |
42 repo.invalidate(clearfilecache=True) | |
43 | |
32 tr = pullop.trmanager.transaction() | 44 tr = pullop.trmanager.transaction() |
33 | 45 |
34 # We don't use the repo's narrow matcher here because the patterns passed | 46 # We don't use the repo's narrow matcher here because the patterns passed |
35 # to exchange.pull() could be different. | 47 # to exchange.pull() could be different. |
36 narrowmatcher = narrowspec.match(repo.root, | 48 narrowmatcher = narrowspec.match(repo.root, |
77 remote.url(), pullop.gettransaction, | 89 remote.url(), pullop.gettransaction, |
78 explicit=pullop.explicitbookmarks) | 90 explicit=pullop.explicitbookmarks) |
79 | 91 |
80 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes']) | 92 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes']) |
81 | 93 |
94 # If obtaining the raw store files, we need to scan the full repo to | |
95 # derive all the changesets, manifests, and linkrevs. | |
96 if usingrawchangelogandmanifest: | |
97 csetsforfiles = [] | |
98 mnodesforfiles = [] | |
99 manifestlinkrevs = {} | |
100 | |
101 for rev in repo: | |
102 ctx = repo[rev] | |
103 mnode = ctx.manifestnode() | |
104 | |
105 csetsforfiles.append(ctx.node()) | |
106 mnodesforfiles.append(mnode) | |
107 manifestlinkrevs[mnode] = rev | |
108 | |
109 else: | |
110 csetsforfiles = csetres['added'] | |
111 mnodesforfiles = manres['added'] | |
112 manifestlinkrevs = manres['linkrevs'] | |
113 | |
82 # Find all file nodes referenced by added manifests and fetch those | 114 # Find all file nodes referenced by added manifests and fetch those |
83 # revisions. | 115 # revisions. |
84 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, manres['added']) | 116 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles) |
85 _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetres['added'], | 117 _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetsforfiles, |
86 manres['linkrevs']) | 118 manifestlinkrevs) |
119 | |
120 def _checkuserawstorefiledata(pullop): | |
121 """Check whether we should use rawstorefiledata command to retrieve data.""" | |
122 | |
123 repo = pullop.repo | |
124 remote = pullop.remote | |
125 | |
126 # Command to obtain raw store data isn't available. | |
127 if b'rawstorefiledata' not in remote.apidescriptor[b'commands']: | |
128 return False | |
129 | |
130 # Only honor if user requested stream clone operation. | |
131 if not pullop.streamclonerequested: | |
132 return False | |
133 | |
134 # Only works on empty repos. | |
135 if len(repo): | |
136 return False | |
137 | |
138 # TODO This is super hacky. There needs to be a storage API for this. We | |
139 # also need to check for compatibility with the remote. | |
140 if b'revlogv1' not in repo.requirements: | |
141 return False | |
142 | |
143 return True | |
144 | |
145 def _fetchrawstorefiles(repo, remote): | |
146 with remote.commandexecutor() as e: | |
147 objs = e.callcommand(b'rawstorefiledata', { | |
148 b'files': [b'changelog', b'manifestlog'], | |
149 }).result() | |
150 | |
151 # First object is a summary of files data that follows. | |
152 overall = next(objs) | |
153 | |
154 progress = repo.ui.makeprogress(_('clone'), total=overall[b'totalsize'], | |
155 unit=_('bytes')) | |
156 with progress: | |
157 progress.update(0) | |
158 | |
159 # Next are pairs of file metadata, data. | |
160 while True: | |
161 try: | |
162 filemeta = next(objs) | |
163 except StopIteration: | |
164 break | |
165 | |
166 for k in (b'location', b'path', b'size'): | |
167 if k not in filemeta: | |
168 raise error.Abort(_(b'remote file data missing key: %s') | |
169 % k) | |
170 | |
171 if filemeta[b'location'] == b'store': | |
172 vfs = repo.svfs | |
173 else: | |
174 raise error.Abort(_(b'invalid location for raw file data: ' | |
175 b'%s') % filemeta[b'location']) | |
176 | |
177 bytesremaining = filemeta[b'size'] | |
178 | |
179 with vfs.open(filemeta[b'path'], b'wb') as fh: | |
180 while True: | |
181 try: | |
182 chunk = next(objs) | |
183 except StopIteration: | |
184 break | |
185 | |
186 bytesremaining -= len(chunk) | |
187 | |
188 if bytesremaining < 0: | |
189 raise error.Abort(_( | |
190 b'received invalid number of bytes for file ' | |
191 b'data; expected %d, got extra') % | |
192 filemeta[b'size']) | |
193 | |
194 progress.increment(step=len(chunk)) | |
195 fh.write(chunk) | |
196 | |
197 try: | |
198 if chunk.islast: | |
199 break | |
200 except AttributeError: | |
201 raise error.Abort(_( | |
202 b'did not receive indefinite length bytestring ' | |
203 b'for file data')) | |
204 | |
205 if bytesremaining: | |
206 raise error.Abort(_(b'received invalid number of bytes for' | |
207 b'file data; expected %d got %d') % | |
208 (filemeta[b'size'], | |
209 filemeta[b'size'] - bytesremaining)) | |
87 | 210 |
88 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): | 211 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): |
89 """Determine which changesets need to be pulled.""" | 212 """Determine which changesets need to be pulled.""" |
90 | 213 |
91 if heads: | 214 if heads: |