comparison mercurial/copies.py @ 44940:4c1d39215034

metadata: move computation related to files touched in a dedicated module This was suggested by Yuya Nishihara a while back. Since I am about to add more metadata related computation, lets create a new repositories. Differential Revision: https://phab.mercurial-scm.org/D8587
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 27 May 2020 12:26:08 +0200
parents 61719b9658b1
children cfd06649a1b8
comparison
equal deleted inserted replaced
44939:818b4f19ef23 44940:4c1d39215034
6 # GNU General Public License version 2 or any later version. 6 # GNU General Public License version 2 or any later version.
7 7
8 from __future__ import absolute_import 8 from __future__ import absolute_import
9 9
10 import collections 10 import collections
11 import multiprocessing
12 import os 11 import os
13 12
14 from .i18n import _ 13 from .i18n import _
15 14
16 15
17 from .revlogutils.flagutil import REVIDX_SIDEDATA 16 from .revlogutils.flagutil import REVIDX_SIDEDATA
18 17
19 from . import ( 18 from . import (
20 error,
21 match as matchmod, 19 match as matchmod,
22 node, 20 node,
23 pathutil, 21 pathutil,
24 pycompat, 22 pycompat,
25 util, 23 util,
26 ) 24 )
27 25
28 from .revlogutils import sidedata as sidedatamod
29 26
30 from .utils import stringutil 27 from .utils import stringutil
31 28
32 29
33 def _filter(src, dst, t): 30 def _filter(src, dst, t):
990 """ 987 """
991 new_copies = pathcopies(base, ctx) 988 new_copies = pathcopies(base, ctx)
992 _filter(wctx.p1(), wctx, new_copies) 989 _filter(wctx.p1(), wctx, new_copies)
993 for dst, src in pycompat.iteritems(new_copies): 990 for dst, src in pycompat.iteritems(new_copies):
994 wctx[dst].markcopied(src) 991 wctx[dst].markcopied(src)
995
996
997 def computechangesetfilesadded(ctx):
998 """return the list of files added in a changeset
999 """
1000 added = []
1001 for f in ctx.files():
1002 if not any(f in p for p in ctx.parents()):
1003 added.append(f)
1004 return added
1005
1006
1007 def computechangesetfilesremoved(ctx):
1008 """return the list of files removed in a changeset
1009 """
1010 removed = []
1011 for f in ctx.files():
1012 if f not in ctx:
1013 removed.append(f)
1014 return removed
1015
1016
1017 def computechangesetcopies(ctx):
1018 """return the copies data for a changeset
1019
1020 The copies data are returned as a pair of dictionnary (p1copies, p2copies).
1021
1022 Each dictionnary are in the form: `{newname: oldname}`
1023 """
1024 p1copies = {}
1025 p2copies = {}
1026 p1 = ctx.p1()
1027 p2 = ctx.p2()
1028 narrowmatch = ctx._repo.narrowmatch()
1029 for dst in ctx.files():
1030 if not narrowmatch(dst) or dst not in ctx:
1031 continue
1032 copied = ctx[dst].renamed()
1033 if not copied:
1034 continue
1035 src, srcnode = copied
1036 if src in p1 and p1[src].filenode() == srcnode:
1037 p1copies[dst] = src
1038 elif src in p2 and p2[src].filenode() == srcnode:
1039 p2copies[dst] = src
1040 return p1copies, p2copies
1041
1042
1043 def encodecopies(files, copies):
1044 items = []
1045 for i, dst in enumerate(files):
1046 if dst in copies:
1047 items.append(b'%d\0%s' % (i, copies[dst]))
1048 if len(items) != len(copies):
1049 raise error.ProgrammingError(
1050 b'some copy targets missing from file list'
1051 )
1052 return b"\n".join(items)
1053
1054
1055 def decodecopies(files, data):
1056 try:
1057 copies = {}
1058 if not data:
1059 return copies
1060 for l in data.split(b'\n'):
1061 strindex, src = l.split(b'\0')
1062 i = int(strindex)
1063 dst = files[i]
1064 copies[dst] = src
1065 return copies
1066 except (ValueError, IndexError):
1067 # Perhaps someone had chosen the same key name (e.g. "p1copies") and
1068 # used different syntax for the value.
1069 return None
1070
1071
1072 def encodefileindices(files, subset):
1073 subset = set(subset)
1074 indices = []
1075 for i, f in enumerate(files):
1076 if f in subset:
1077 indices.append(b'%d' % i)
1078 return b'\n'.join(indices)
1079
1080
1081 def decodefileindices(files, data):
1082 try:
1083 subset = []
1084 if not data:
1085 return subset
1086 for strindex in data.split(b'\n'):
1087 i = int(strindex)
1088 if i < 0 or i >= len(files):
1089 return None
1090 subset.append(files[i])
1091 return subset
1092 except (ValueError, IndexError):
1093 # Perhaps someone had chosen the same key name (e.g. "added") and
1094 # used different syntax for the value.
1095 return None
1096
1097
1098 def _getsidedata(srcrepo, rev):
1099 ctx = srcrepo[rev]
1100 filescopies = computechangesetcopies(ctx)
1101 filesadded = computechangesetfilesadded(ctx)
1102 filesremoved = computechangesetfilesremoved(ctx)
1103 sidedata = {}
1104 if any([filescopies, filesadded, filesremoved]):
1105 sortedfiles = sorted(ctx.files())
1106 p1copies, p2copies = filescopies
1107 p1copies = encodecopies(sortedfiles, p1copies)
1108 p2copies = encodecopies(sortedfiles, p2copies)
1109 filesadded = encodefileindices(sortedfiles, filesadded)
1110 filesremoved = encodefileindices(sortedfiles, filesremoved)
1111 if p1copies:
1112 sidedata[sidedatamod.SD_P1COPIES] = p1copies
1113 if p2copies:
1114 sidedata[sidedatamod.SD_P2COPIES] = p2copies
1115 if filesadded:
1116 sidedata[sidedatamod.SD_FILESADDED] = filesadded
1117 if filesremoved:
1118 sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved
1119 return sidedata
1120
1121
1122 def getsidedataadder(srcrepo, destrepo):
1123 use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade')
1124 if pycompat.iswindows or not use_w:
1125 return _get_simple_sidedata_adder(srcrepo, destrepo)
1126 else:
1127 return _get_worker_sidedata_adder(srcrepo, destrepo)
1128
1129
1130 def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
1131 """The function used by worker precomputing sidedata
1132
1133 It read an input queue containing revision numbers
1134 It write in an output queue containing (rev, <sidedata-map>)
1135
1136 The `None` input value is used as a stop signal.
1137
1138 The `tokens` semaphore is user to avoid having too many unprocessed
1139 entries. The workers needs to acquire one token before fetching a task.
1140 They will be released by the consumer of the produced data.
1141 """
1142 tokens.acquire()
1143 rev = revs_queue.get()
1144 while rev is not None:
1145 data = _getsidedata(srcrepo, rev)
1146 sidedata_queue.put((rev, data))
1147 tokens.acquire()
1148 rev = revs_queue.get()
1149 # processing of `None` is completed, release the token.
1150 tokens.release()
1151
1152
1153 BUFF_PER_WORKER = 50
1154
1155
1156 def _get_worker_sidedata_adder(srcrepo, destrepo):
1157 """The parallel version of the sidedata computation
1158
1159 This code spawn a pool of worker that precompute a buffer of sidedata
1160 before we actually need them"""
1161 # avoid circular import copies -> scmutil -> worker -> copies
1162 from . import worker
1163
1164 nbworkers = worker._numworkers(srcrepo.ui)
1165
1166 tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
1167 revsq = multiprocessing.Queue()
1168 sidedataq = multiprocessing.Queue()
1169
1170 assert srcrepo.filtername is None
1171 # queue all tasks beforehand, revision numbers are small and it make
1172 # synchronisation simpler
1173 #
1174 # Since the computation for each node can be quite expensive, the overhead
1175 # of using a single queue is not revelant. In practice, most computation
1176 # are fast but some are very expensive and dominate all the other smaller
1177 # cost.
1178 for r in srcrepo.changelog.revs():
1179 revsq.put(r)
1180 # queue the "no more tasks" markers
1181 for i in range(nbworkers):
1182 revsq.put(None)
1183
1184 allworkers = []
1185 for i in range(nbworkers):
1186 args = (srcrepo, revsq, sidedataq, tokens)
1187 w = multiprocessing.Process(target=_sidedata_worker, args=args)
1188 allworkers.append(w)
1189 w.start()
1190
1191 # dictionnary to store results for revision higher than we one we are
1192 # looking for. For example, if we need the sidedatamap for 42, and 43 is
1193 # received, when shelve 43 for later use.
1194 staging = {}
1195
1196 def sidedata_companion(revlog, rev):
1197 sidedata = {}
1198 if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog
1199 # Is the data previously shelved ?
1200 sidedata = staging.pop(rev, None)
1201 if sidedata is None:
1202 # look at the queued result until we find the one we are lookig
1203 # for (shelve the other ones)
1204 r, sidedata = sidedataq.get()
1205 while r != rev:
1206 staging[r] = sidedata
1207 r, sidedata = sidedataq.get()
1208 tokens.release()
1209 return False, (), sidedata
1210
1211 return sidedata_companion
1212
1213
1214 def _get_simple_sidedata_adder(srcrepo, destrepo):
1215 """The simple version of the sidedata computation
1216
1217 It just compute it in the same thread on request"""
1218
1219 def sidedatacompanion(revlog, rev):
1220 sidedata = {}
1221 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
1222 sidedata = _getsidedata(srcrepo, rev)
1223 return False, (), sidedata
1224
1225 return sidedatacompanion
1226
1227
1228 def getsidedataremover(srcrepo, destrepo):
1229 def sidedatacompanion(revlog, rev):
1230 f = ()
1231 if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
1232 if revlog.flags(rev) & REVIDX_SIDEDATA:
1233 f = (
1234 sidedatamod.SD_P1COPIES,
1235 sidedatamod.SD_P2COPIES,
1236 sidedatamod.SD_FILESADDED,
1237 sidedatamod.SD_FILESREMOVED,
1238 )
1239 return False, f, {}
1240
1241 return sidedatacompanion