comparison mercurial/metadata.py @ 44940:4c1d39215034

metadata: move computation related to files touched in a dedicated module This was suggested by Yuya Nishihara a while back. Since I am about to add more metadata related computation, lets create a new repositories. Differential Revision: https://phab.mercurial-scm.org/D8587
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 27 May 2020 12:26:08 +0200
parents
children edd08aa193fb
comparison
equal deleted inserted replaced
44939:818b4f19ef23 44940:4c1d39215034
1 # metadata.py -- code related to various metadata computation and access.
2 #
3 # Copyright 2019 Google, Inc <martinvonz@google.com>
4 # Copyright 2020 Pierre-Yves David <pierre-yves.david@octobus.net>
5 #
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
8 from __future__ import absolute_import, print_function
9
10 import multiprocessing
11
12 from . import (
13 error,
14 pycompat,
15 util,
16 )
17
18 from .revlogutils import (
19 flagutil as sidedataflag,
20 sidedata as sidedatamod,
21 )
22
23
24 def computechangesetfilesadded(ctx):
25 """return the list of files added in a changeset
26 """
27 added = []
28 for f in ctx.files():
29 if not any(f in p for p in ctx.parents()):
30 added.append(f)
31 return added
32
33
34 def computechangesetfilesremoved(ctx):
35 """return the list of files removed in a changeset
36 """
37 removed = []
38 for f in ctx.files():
39 if f not in ctx:
40 removed.append(f)
41 return removed
42
43
44 def computechangesetcopies(ctx):
45 """return the copies data for a changeset
46
47 The copies data are returned as a pair of dictionnary (p1copies, p2copies).
48
49 Each dictionnary are in the form: `{newname: oldname}`
50 """
51 p1copies = {}
52 p2copies = {}
53 p1 = ctx.p1()
54 p2 = ctx.p2()
55 narrowmatch = ctx._repo.narrowmatch()
56 for dst in ctx.files():
57 if not narrowmatch(dst) or dst not in ctx:
58 continue
59 copied = ctx[dst].renamed()
60 if not copied:
61 continue
62 src, srcnode = copied
63 if src in p1 and p1[src].filenode() == srcnode:
64 p1copies[dst] = src
65 elif src in p2 and p2[src].filenode() == srcnode:
66 p2copies[dst] = src
67 return p1copies, p2copies
68
69
70 def encodecopies(files, copies):
71 items = []
72 for i, dst in enumerate(files):
73 if dst in copies:
74 items.append(b'%d\0%s' % (i, copies[dst]))
75 if len(items) != len(copies):
76 raise error.ProgrammingError(
77 b'some copy targets missing from file list'
78 )
79 return b"\n".join(items)
80
81
82 def decodecopies(files, data):
83 try:
84 copies = {}
85 if not data:
86 return copies
87 for l in data.split(b'\n'):
88 strindex, src = l.split(b'\0')
89 i = int(strindex)
90 dst = files[i]
91 copies[dst] = src
92 return copies
93 except (ValueError, IndexError):
94 # Perhaps someone had chosen the same key name (e.g. "p1copies") and
95 # used different syntax for the value.
96 return None
97
98
99 def encodefileindices(files, subset):
100 subset = set(subset)
101 indices = []
102 for i, f in enumerate(files):
103 if f in subset:
104 indices.append(b'%d' % i)
105 return b'\n'.join(indices)
106
107
108 def decodefileindices(files, data):
109 try:
110 subset = []
111 if not data:
112 return subset
113 for strindex in data.split(b'\n'):
114 i = int(strindex)
115 if i < 0 or i >= len(files):
116 return None
117 subset.append(files[i])
118 return subset
119 except (ValueError, IndexError):
120 # Perhaps someone had chosen the same key name (e.g. "added") and
121 # used different syntax for the value.
122 return None
123
124
125 def _getsidedata(srcrepo, rev):
126 ctx = srcrepo[rev]
127 filescopies = computechangesetcopies(ctx)
128 filesadded = computechangesetfilesadded(ctx)
129 filesremoved = computechangesetfilesremoved(ctx)
130 sidedata = {}
131 if any([filescopies, filesadded, filesremoved]):
132 sortedfiles = sorted(ctx.files())
133 p1copies, p2copies = filescopies
134 p1copies = encodecopies(sortedfiles, p1copies)
135 p2copies = encodecopies(sortedfiles, p2copies)
136 filesadded = encodefileindices(sortedfiles, filesadded)
137 filesremoved = encodefileindices(sortedfiles, filesremoved)
138 if p1copies:
139 sidedata[sidedatamod.SD_P1COPIES] = p1copies
140 if p2copies:
141 sidedata[sidedatamod.SD_P2COPIES] = p2copies
142 if filesadded:
143 sidedata[sidedatamod.SD_FILESADDED] = filesadded
144 if filesremoved:
145 sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved
146 return sidedata
147
148
149 def getsidedataadder(srcrepo, destrepo):
150 use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade')
151 if pycompat.iswindows or not use_w:
152 return _get_simple_sidedata_adder(srcrepo, destrepo)
153 else:
154 return _get_worker_sidedata_adder(srcrepo, destrepo)
155
156
157 def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
158 """The function used by worker precomputing sidedata
159
160 It read an input queue containing revision numbers
161 It write in an output queue containing (rev, <sidedata-map>)
162
163 The `None` input value is used as a stop signal.
164
165 The `tokens` semaphore is user to avoid having too many unprocessed
166 entries. The workers needs to acquire one token before fetching a task.
167 They will be released by the consumer of the produced data.
168 """
169 tokens.acquire()
170 rev = revs_queue.get()
171 while rev is not None:
172 data = _getsidedata(srcrepo, rev)
173 sidedata_queue.put((rev, data))
174 tokens.acquire()
175 rev = revs_queue.get()
176 # processing of `None` is completed, release the token.
177 tokens.release()
178
179
180 BUFF_PER_WORKER = 50
181
182
183 def _get_worker_sidedata_adder(srcrepo, destrepo):
184 """The parallel version of the sidedata computation
185
186 This code spawn a pool of worker that precompute a buffer of sidedata
187 before we actually need them"""
188 # avoid circular import copies -> scmutil -> worker -> copies
189 from . import worker
190
191 nbworkers = worker._numworkers(srcrepo.ui)
192
193 tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
194 revsq = multiprocessing.Queue()
195 sidedataq = multiprocessing.Queue()
196
197 assert srcrepo.filtername is None
198 # queue all tasks beforehand, revision numbers are small and it make
199 # synchronisation simpler
200 #
201 # Since the computation for each node can be quite expensive, the overhead
202 # of using a single queue is not revelant. In practice, most computation
203 # are fast but some are very expensive and dominate all the other smaller
204 # cost.
205 for r in srcrepo.changelog.revs():
206 revsq.put(r)
207 # queue the "no more tasks" markers
208 for i in range(nbworkers):
209 revsq.put(None)
210
211 allworkers = []
212 for i in range(nbworkers):
213 args = (srcrepo, revsq, sidedataq, tokens)
214 w = multiprocessing.Process(target=_sidedata_worker, args=args)
215 allworkers.append(w)
216 w.start()
217
218 # dictionnary to store results for revision higher than we one we are
219 # looking for. For example, if we need the sidedatamap for 42, and 43 is
220 # received, when shelve 43 for later use.
221 staging = {}
222
223 def sidedata_companion(revlog, rev):
224 sidedata = {}
225 if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog
226 # Is the data previously shelved ?
227 sidedata = staging.pop(rev, None)
228 if sidedata is None:
229 # look at the queued result until we find the one we are lookig
230 # for (shelve the other ones)
231 r, sidedata = sidedataq.get()
232 while r != rev:
233 staging[r] = sidedata
234 r, sidedata = sidedataq.get()
235 tokens.release()
236 return False, (), sidedata
237
238 return sidedata_companion
239
240
241 def _get_simple_sidedata_adder(srcrepo, destrepo):
242 """The simple version of the sidedata computation
243
244 It just compute it in the same thread on request"""
245
246 def sidedatacompanion(revlog, rev):
247 sidedata = {}
248 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
249 sidedata = _getsidedata(srcrepo, rev)
250 return False, (), sidedata
251
252 return sidedatacompanion
253
254
255 def getsidedataremover(srcrepo, destrepo):
256 def sidedatacompanion(revlog, rev):
257 f = ()
258 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
259 if revlog.flags(rev) & sidedataflag.REVIDX_SIDEDATA:
260 f = (
261 sidedatamod.SD_P1COPIES,
262 sidedatamod.SD_P2COPIES,
263 sidedatamod.SD_FILESADDED,
264 sidedatamod.SD_FILESREMOVED,
265 )
266 return False, f, {}
267
268 return sidedatacompanion