Mercurial > hg
comparison mercurial/metadata.py @ 44940:4c1d39215034
metadata: move computation related to files touched in a dedicated module
This was suggested by Yuya Nishihara a while back. Since I am about to add more
metadata related computation, lets create a new repositories.
Differential Revision: https://phab.mercurial-scm.org/D8587
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 27 May 2020 12:26:08 +0200 |
parents | |
children | edd08aa193fb |
comparison
equal
deleted
inserted
replaced
44939:818b4f19ef23 | 44940:4c1d39215034 |
---|---|
1 # metadata.py -- code related to various metadata computation and access. | |
2 # | |
3 # Copyright 2019 Google, Inc <martinvonz@google.com> | |
4 # Copyright 2020 Pierre-Yves David <pierre-yves.david@octobus.net> | |
5 # | |
6 # This software may be used and distributed according to the terms of the | |
7 # GNU General Public License version 2 or any later version. | |
8 from __future__ import absolute_import, print_function | |
9 | |
10 import multiprocessing | |
11 | |
12 from . import ( | |
13 error, | |
14 pycompat, | |
15 util, | |
16 ) | |
17 | |
18 from .revlogutils import ( | |
19 flagutil as sidedataflag, | |
20 sidedata as sidedatamod, | |
21 ) | |
22 | |
23 | |
24 def computechangesetfilesadded(ctx): | |
25 """return the list of files added in a changeset | |
26 """ | |
27 added = [] | |
28 for f in ctx.files(): | |
29 if not any(f in p for p in ctx.parents()): | |
30 added.append(f) | |
31 return added | |
32 | |
33 | |
34 def computechangesetfilesremoved(ctx): | |
35 """return the list of files removed in a changeset | |
36 """ | |
37 removed = [] | |
38 for f in ctx.files(): | |
39 if f not in ctx: | |
40 removed.append(f) | |
41 return removed | |
42 | |
43 | |
44 def computechangesetcopies(ctx): | |
45 """return the copies data for a changeset | |
46 | |
47 The copies data are returned as a pair of dictionnary (p1copies, p2copies). | |
48 | |
49 Each dictionnary are in the form: `{newname: oldname}` | |
50 """ | |
51 p1copies = {} | |
52 p2copies = {} | |
53 p1 = ctx.p1() | |
54 p2 = ctx.p2() | |
55 narrowmatch = ctx._repo.narrowmatch() | |
56 for dst in ctx.files(): | |
57 if not narrowmatch(dst) or dst not in ctx: | |
58 continue | |
59 copied = ctx[dst].renamed() | |
60 if not copied: | |
61 continue | |
62 src, srcnode = copied | |
63 if src in p1 and p1[src].filenode() == srcnode: | |
64 p1copies[dst] = src | |
65 elif src in p2 and p2[src].filenode() == srcnode: | |
66 p2copies[dst] = src | |
67 return p1copies, p2copies | |
68 | |
69 | |
70 def encodecopies(files, copies): | |
71 items = [] | |
72 for i, dst in enumerate(files): | |
73 if dst in copies: | |
74 items.append(b'%d\0%s' % (i, copies[dst])) | |
75 if len(items) != len(copies): | |
76 raise error.ProgrammingError( | |
77 b'some copy targets missing from file list' | |
78 ) | |
79 return b"\n".join(items) | |
80 | |
81 | |
82 def decodecopies(files, data): | |
83 try: | |
84 copies = {} | |
85 if not data: | |
86 return copies | |
87 for l in data.split(b'\n'): | |
88 strindex, src = l.split(b'\0') | |
89 i = int(strindex) | |
90 dst = files[i] | |
91 copies[dst] = src | |
92 return copies | |
93 except (ValueError, IndexError): | |
94 # Perhaps someone had chosen the same key name (e.g. "p1copies") and | |
95 # used different syntax for the value. | |
96 return None | |
97 | |
98 | |
99 def encodefileindices(files, subset): | |
100 subset = set(subset) | |
101 indices = [] | |
102 for i, f in enumerate(files): | |
103 if f in subset: | |
104 indices.append(b'%d' % i) | |
105 return b'\n'.join(indices) | |
106 | |
107 | |
108 def decodefileindices(files, data): | |
109 try: | |
110 subset = [] | |
111 if not data: | |
112 return subset | |
113 for strindex in data.split(b'\n'): | |
114 i = int(strindex) | |
115 if i < 0 or i >= len(files): | |
116 return None | |
117 subset.append(files[i]) | |
118 return subset | |
119 except (ValueError, IndexError): | |
120 # Perhaps someone had chosen the same key name (e.g. "added") and | |
121 # used different syntax for the value. | |
122 return None | |
123 | |
124 | |
125 def _getsidedata(srcrepo, rev): | |
126 ctx = srcrepo[rev] | |
127 filescopies = computechangesetcopies(ctx) | |
128 filesadded = computechangesetfilesadded(ctx) | |
129 filesremoved = computechangesetfilesremoved(ctx) | |
130 sidedata = {} | |
131 if any([filescopies, filesadded, filesremoved]): | |
132 sortedfiles = sorted(ctx.files()) | |
133 p1copies, p2copies = filescopies | |
134 p1copies = encodecopies(sortedfiles, p1copies) | |
135 p2copies = encodecopies(sortedfiles, p2copies) | |
136 filesadded = encodefileindices(sortedfiles, filesadded) | |
137 filesremoved = encodefileindices(sortedfiles, filesremoved) | |
138 if p1copies: | |
139 sidedata[sidedatamod.SD_P1COPIES] = p1copies | |
140 if p2copies: | |
141 sidedata[sidedatamod.SD_P2COPIES] = p2copies | |
142 if filesadded: | |
143 sidedata[sidedatamod.SD_FILESADDED] = filesadded | |
144 if filesremoved: | |
145 sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved | |
146 return sidedata | |
147 | |
148 | |
149 def getsidedataadder(srcrepo, destrepo): | |
150 use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade') | |
151 if pycompat.iswindows or not use_w: | |
152 return _get_simple_sidedata_adder(srcrepo, destrepo) | |
153 else: | |
154 return _get_worker_sidedata_adder(srcrepo, destrepo) | |
155 | |
156 | |
157 def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens): | |
158 """The function used by worker precomputing sidedata | |
159 | |
160 It read an input queue containing revision numbers | |
161 It write in an output queue containing (rev, <sidedata-map>) | |
162 | |
163 The `None` input value is used as a stop signal. | |
164 | |
165 The `tokens` semaphore is user to avoid having too many unprocessed | |
166 entries. The workers needs to acquire one token before fetching a task. | |
167 They will be released by the consumer of the produced data. | |
168 """ | |
169 tokens.acquire() | |
170 rev = revs_queue.get() | |
171 while rev is not None: | |
172 data = _getsidedata(srcrepo, rev) | |
173 sidedata_queue.put((rev, data)) | |
174 tokens.acquire() | |
175 rev = revs_queue.get() | |
176 # processing of `None` is completed, release the token. | |
177 tokens.release() | |
178 | |
179 | |
180 BUFF_PER_WORKER = 50 | |
181 | |
182 | |
183 def _get_worker_sidedata_adder(srcrepo, destrepo): | |
184 """The parallel version of the sidedata computation | |
185 | |
186 This code spawn a pool of worker that precompute a buffer of sidedata | |
187 before we actually need them""" | |
188 # avoid circular import copies -> scmutil -> worker -> copies | |
189 from . import worker | |
190 | |
191 nbworkers = worker._numworkers(srcrepo.ui) | |
192 | |
193 tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER) | |
194 revsq = multiprocessing.Queue() | |
195 sidedataq = multiprocessing.Queue() | |
196 | |
197 assert srcrepo.filtername is None | |
198 # queue all tasks beforehand, revision numbers are small and it make | |
199 # synchronisation simpler | |
200 # | |
201 # Since the computation for each node can be quite expensive, the overhead | |
202 # of using a single queue is not revelant. In practice, most computation | |
203 # are fast but some are very expensive and dominate all the other smaller | |
204 # cost. | |
205 for r in srcrepo.changelog.revs(): | |
206 revsq.put(r) | |
207 # queue the "no more tasks" markers | |
208 for i in range(nbworkers): | |
209 revsq.put(None) | |
210 | |
211 allworkers = [] | |
212 for i in range(nbworkers): | |
213 args = (srcrepo, revsq, sidedataq, tokens) | |
214 w = multiprocessing.Process(target=_sidedata_worker, args=args) | |
215 allworkers.append(w) | |
216 w.start() | |
217 | |
218 # dictionnary to store results for revision higher than we one we are | |
219 # looking for. For example, if we need the sidedatamap for 42, and 43 is | |
220 # received, when shelve 43 for later use. | |
221 staging = {} | |
222 | |
223 def sidedata_companion(revlog, rev): | |
224 sidedata = {} | |
225 if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog | |
226 # Is the data previously shelved ? | |
227 sidedata = staging.pop(rev, None) | |
228 if sidedata is None: | |
229 # look at the queued result until we find the one we are lookig | |
230 # for (shelve the other ones) | |
231 r, sidedata = sidedataq.get() | |
232 while r != rev: | |
233 staging[r] = sidedata | |
234 r, sidedata = sidedataq.get() | |
235 tokens.release() | |
236 return False, (), sidedata | |
237 | |
238 return sidedata_companion | |
239 | |
240 | |
241 def _get_simple_sidedata_adder(srcrepo, destrepo): | |
242 """The simple version of the sidedata computation | |
243 | |
244 It just compute it in the same thread on request""" | |
245 | |
246 def sidedatacompanion(revlog, rev): | |
247 sidedata = {} | |
248 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog | |
249 sidedata = _getsidedata(srcrepo, rev) | |
250 return False, (), sidedata | |
251 | |
252 return sidedatacompanion | |
253 | |
254 | |
255 def getsidedataremover(srcrepo, destrepo): | |
256 def sidedatacompanion(revlog, rev): | |
257 f = () | |
258 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog | |
259 if revlog.flags(rev) & sidedataflag.REVIDX_SIDEDATA: | |
260 f = ( | |
261 sidedatamod.SD_P1COPIES, | |
262 sidedatamod.SD_P2COPIES, | |
263 sidedatamod.SD_FILESADDED, | |
264 sidedatamod.SD_FILESREMOVED, | |
265 ) | |
266 return False, f, {} | |
267 | |
268 return sidedatacompanion |