Mercurial > hg
comparison mercurial/copies.py @ 44940:4c1d39215034
metadata: move computation related to files touched in a dedicated module
This was suggested by Yuya Nishihara a while back. Since I am about to add more
metadata related computation, lets create a new repositories.
Differential Revision: https://phab.mercurial-scm.org/D8587
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 27 May 2020 12:26:08 +0200 |
parents | 61719b9658b1 |
children | cfd06649a1b8 |
comparison
equal
deleted
inserted
replaced
44939:818b4f19ef23 | 44940:4c1d39215034 |
---|---|
6 # GNU General Public License version 2 or any later version. | 6 # GNU General Public License version 2 or any later version. |
7 | 7 |
8 from __future__ import absolute_import | 8 from __future__ import absolute_import |
9 | 9 |
10 import collections | 10 import collections |
11 import multiprocessing | |
12 import os | 11 import os |
13 | 12 |
14 from .i18n import _ | 13 from .i18n import _ |
15 | 14 |
16 | 15 |
17 from .revlogutils.flagutil import REVIDX_SIDEDATA | 16 from .revlogutils.flagutil import REVIDX_SIDEDATA |
18 | 17 |
19 from . import ( | 18 from . import ( |
20 error, | |
21 match as matchmod, | 19 match as matchmod, |
22 node, | 20 node, |
23 pathutil, | 21 pathutil, |
24 pycompat, | 22 pycompat, |
25 util, | 23 util, |
26 ) | 24 ) |
27 | 25 |
28 from .revlogutils import sidedata as sidedatamod | |
29 | 26 |
30 from .utils import stringutil | 27 from .utils import stringutil |
31 | 28 |
32 | 29 |
33 def _filter(src, dst, t): | 30 def _filter(src, dst, t): |
990 """ | 987 """ |
991 new_copies = pathcopies(base, ctx) | 988 new_copies = pathcopies(base, ctx) |
992 _filter(wctx.p1(), wctx, new_copies) | 989 _filter(wctx.p1(), wctx, new_copies) |
993 for dst, src in pycompat.iteritems(new_copies): | 990 for dst, src in pycompat.iteritems(new_copies): |
994 wctx[dst].markcopied(src) | 991 wctx[dst].markcopied(src) |
995 | |
996 | |
997 def computechangesetfilesadded(ctx): | |
998 """return the list of files added in a changeset | |
999 """ | |
1000 added = [] | |
1001 for f in ctx.files(): | |
1002 if not any(f in p for p in ctx.parents()): | |
1003 added.append(f) | |
1004 return added | |
1005 | |
1006 | |
1007 def computechangesetfilesremoved(ctx): | |
1008 """return the list of files removed in a changeset | |
1009 """ | |
1010 removed = [] | |
1011 for f in ctx.files(): | |
1012 if f not in ctx: | |
1013 removed.append(f) | |
1014 return removed | |
1015 | |
1016 | |
1017 def computechangesetcopies(ctx): | |
1018 """return the copies data for a changeset | |
1019 | |
1020 The copies data are returned as a pair of dictionnary (p1copies, p2copies). | |
1021 | |
1022 Each dictionnary are in the form: `{newname: oldname}` | |
1023 """ | |
1024 p1copies = {} | |
1025 p2copies = {} | |
1026 p1 = ctx.p1() | |
1027 p2 = ctx.p2() | |
1028 narrowmatch = ctx._repo.narrowmatch() | |
1029 for dst in ctx.files(): | |
1030 if not narrowmatch(dst) or dst not in ctx: | |
1031 continue | |
1032 copied = ctx[dst].renamed() | |
1033 if not copied: | |
1034 continue | |
1035 src, srcnode = copied | |
1036 if src in p1 and p1[src].filenode() == srcnode: | |
1037 p1copies[dst] = src | |
1038 elif src in p2 and p2[src].filenode() == srcnode: | |
1039 p2copies[dst] = src | |
1040 return p1copies, p2copies | |
1041 | |
1042 | |
1043 def encodecopies(files, copies): | |
1044 items = [] | |
1045 for i, dst in enumerate(files): | |
1046 if dst in copies: | |
1047 items.append(b'%d\0%s' % (i, copies[dst])) | |
1048 if len(items) != len(copies): | |
1049 raise error.ProgrammingError( | |
1050 b'some copy targets missing from file list' | |
1051 ) | |
1052 return b"\n".join(items) | |
1053 | |
1054 | |
1055 def decodecopies(files, data): | |
1056 try: | |
1057 copies = {} | |
1058 if not data: | |
1059 return copies | |
1060 for l in data.split(b'\n'): | |
1061 strindex, src = l.split(b'\0') | |
1062 i = int(strindex) | |
1063 dst = files[i] | |
1064 copies[dst] = src | |
1065 return copies | |
1066 except (ValueError, IndexError): | |
1067 # Perhaps someone had chosen the same key name (e.g. "p1copies") and | |
1068 # used different syntax for the value. | |
1069 return None | |
1070 | |
1071 | |
1072 def encodefileindices(files, subset): | |
1073 subset = set(subset) | |
1074 indices = [] | |
1075 for i, f in enumerate(files): | |
1076 if f in subset: | |
1077 indices.append(b'%d' % i) | |
1078 return b'\n'.join(indices) | |
1079 | |
1080 | |
1081 def decodefileindices(files, data): | |
1082 try: | |
1083 subset = [] | |
1084 if not data: | |
1085 return subset | |
1086 for strindex in data.split(b'\n'): | |
1087 i = int(strindex) | |
1088 if i < 0 or i >= len(files): | |
1089 return None | |
1090 subset.append(files[i]) | |
1091 return subset | |
1092 except (ValueError, IndexError): | |
1093 # Perhaps someone had chosen the same key name (e.g. "added") and | |
1094 # used different syntax for the value. | |
1095 return None | |
1096 | |
1097 | |
1098 def _getsidedata(srcrepo, rev): | |
1099 ctx = srcrepo[rev] | |
1100 filescopies = computechangesetcopies(ctx) | |
1101 filesadded = computechangesetfilesadded(ctx) | |
1102 filesremoved = computechangesetfilesremoved(ctx) | |
1103 sidedata = {} | |
1104 if any([filescopies, filesadded, filesremoved]): | |
1105 sortedfiles = sorted(ctx.files()) | |
1106 p1copies, p2copies = filescopies | |
1107 p1copies = encodecopies(sortedfiles, p1copies) | |
1108 p2copies = encodecopies(sortedfiles, p2copies) | |
1109 filesadded = encodefileindices(sortedfiles, filesadded) | |
1110 filesremoved = encodefileindices(sortedfiles, filesremoved) | |
1111 if p1copies: | |
1112 sidedata[sidedatamod.SD_P1COPIES] = p1copies | |
1113 if p2copies: | |
1114 sidedata[sidedatamod.SD_P2COPIES] = p2copies | |
1115 if filesadded: | |
1116 sidedata[sidedatamod.SD_FILESADDED] = filesadded | |
1117 if filesremoved: | |
1118 sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved | |
1119 return sidedata | |
1120 | |
1121 | |
1122 def getsidedataadder(srcrepo, destrepo): | |
1123 use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade') | |
1124 if pycompat.iswindows or not use_w: | |
1125 return _get_simple_sidedata_adder(srcrepo, destrepo) | |
1126 else: | |
1127 return _get_worker_sidedata_adder(srcrepo, destrepo) | |
1128 | |
1129 | |
1130 def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens): | |
1131 """The function used by worker precomputing sidedata | |
1132 | |
1133 It read an input queue containing revision numbers | |
1134 It write in an output queue containing (rev, <sidedata-map>) | |
1135 | |
1136 The `None` input value is used as a stop signal. | |
1137 | |
1138 The `tokens` semaphore is user to avoid having too many unprocessed | |
1139 entries. The workers needs to acquire one token before fetching a task. | |
1140 They will be released by the consumer of the produced data. | |
1141 """ | |
1142 tokens.acquire() | |
1143 rev = revs_queue.get() | |
1144 while rev is not None: | |
1145 data = _getsidedata(srcrepo, rev) | |
1146 sidedata_queue.put((rev, data)) | |
1147 tokens.acquire() | |
1148 rev = revs_queue.get() | |
1149 # processing of `None` is completed, release the token. | |
1150 tokens.release() | |
1151 | |
1152 | |
1153 BUFF_PER_WORKER = 50 | |
1154 | |
1155 | |
1156 def _get_worker_sidedata_adder(srcrepo, destrepo): | |
1157 """The parallel version of the sidedata computation | |
1158 | |
1159 This code spawn a pool of worker that precompute a buffer of sidedata | |
1160 before we actually need them""" | |
1161 # avoid circular import copies -> scmutil -> worker -> copies | |
1162 from . import worker | |
1163 | |
1164 nbworkers = worker._numworkers(srcrepo.ui) | |
1165 | |
1166 tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER) | |
1167 revsq = multiprocessing.Queue() | |
1168 sidedataq = multiprocessing.Queue() | |
1169 | |
1170 assert srcrepo.filtername is None | |
1171 # queue all tasks beforehand, revision numbers are small and it make | |
1172 # synchronisation simpler | |
1173 # | |
1174 # Since the computation for each node can be quite expensive, the overhead | |
1175 # of using a single queue is not revelant. In practice, most computation | |
1176 # are fast but some are very expensive and dominate all the other smaller | |
1177 # cost. | |
1178 for r in srcrepo.changelog.revs(): | |
1179 revsq.put(r) | |
1180 # queue the "no more tasks" markers | |
1181 for i in range(nbworkers): | |
1182 revsq.put(None) | |
1183 | |
1184 allworkers = [] | |
1185 for i in range(nbworkers): | |
1186 args = (srcrepo, revsq, sidedataq, tokens) | |
1187 w = multiprocessing.Process(target=_sidedata_worker, args=args) | |
1188 allworkers.append(w) | |
1189 w.start() | |
1190 | |
1191 # dictionnary to store results for revision higher than we one we are | |
1192 # looking for. For example, if we need the sidedatamap for 42, and 43 is | |
1193 # received, when shelve 43 for later use. | |
1194 staging = {} | |
1195 | |
1196 def sidedata_companion(revlog, rev): | |
1197 sidedata = {} | |
1198 if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog | |
1199 # Is the data previously shelved ? | |
1200 sidedata = staging.pop(rev, None) | |
1201 if sidedata is None: | |
1202 # look at the queued result until we find the one we are lookig | |
1203 # for (shelve the other ones) | |
1204 r, sidedata = sidedataq.get() | |
1205 while r != rev: | |
1206 staging[r] = sidedata | |
1207 r, sidedata = sidedataq.get() | |
1208 tokens.release() | |
1209 return False, (), sidedata | |
1210 | |
1211 return sidedata_companion | |
1212 | |
1213 | |
1214 def _get_simple_sidedata_adder(srcrepo, destrepo): | |
1215 """The simple version of the sidedata computation | |
1216 | |
1217 It just compute it in the same thread on request""" | |
1218 | |
1219 def sidedatacompanion(revlog, rev): | |
1220 sidedata = {} | |
1221 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog | |
1222 sidedata = _getsidedata(srcrepo, rev) | |
1223 return False, (), sidedata | |
1224 | |
1225 return sidedatacompanion | |
1226 | |
1227 | |
1228 def getsidedataremover(srcrepo, destrepo): | |
1229 def sidedatacompanion(revlog, rev): | |
1230 f = () | |
1231 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog | |
1232 if revlog.flags(rev) & REVIDX_SIDEDATA: | |
1233 f = ( | |
1234 sidedatamod.SD_P1COPIES, | |
1235 sidedatamod.SD_P2COPIES, | |
1236 sidedatamod.SD_FILESADDED, | |
1237 sidedatamod.SD_FILESREMOVED, | |
1238 ) | |
1239 return False, f, {} | |
1240 | |
1241 return sidedatacompanion |