comparison mercurial/changelog.py @ 23201:7e97bf6ee2d6

changelog: rework the delayupdate mechanism The current way we use the 'delayupdate' mechanism is wrong. We call 'delayupdate' right after the transaction retrieval, then we call 'finalize' right before calling 'tr.close()'. The 'finalize' call will -always- result in a flush to disk, making the data available to all readers. But the 'tr.close()' may be a no-op if the transaction is nested. This would result in data: 1) exposed to reader too early, 2) rolled back by other part of the transaction after such exposure So we need to end up in a situation where we call 'finalize' a single time when the transaction actually closes. For this purpose we need to be able to call 'delayupdate' and '_writepending' multiple times and 'finalize' once. This was not possible with the previous state of the code. This changeset refactors the code to makes this possible. We buffer data in memory as much as possible and fall-back to writing to a ".a" file after the first call to '_writepending'.
author Pierre-Yves David <pierre-yves.david@fb.com>
date Sat, 18 Oct 2014 01:12:18 -0700
parents fe5f044b753d
children 3872d563e01a
comparison
equal deleted inserted replaced
23200:48a2eefd3d20 23201:7e97bf6ee2d6
106 106
107 def write(self, s): 107 def write(self, s):
108 self.data.append(str(s)) 108 self.data.append(str(s))
109 self.offset += len(s) 109 self.offset += len(s)
110 110
111 def delayopener(opener, target, divert, buf): 111 def _divertopener(opener, target):
112 def o(name, mode='r'): 112 """build an opener that writes in 'target.a' instead of 'target'"""
113 def _divert(name, mode='r'):
113 if name != target: 114 if name != target:
114 return opener(name, mode) 115 return opener(name, mode)
115 if divert: 116 return opener(name + ".a", mode)
116 return opener(name + ".a", mode.replace('a', 'w')) 117 return _divert
117 # otherwise, divert to memory 118
119 def _delayopener(opener, target, buf):
120 """build an opener that stores chunks in 'buf' instead of 'target'"""
121 def _delay(name, mode='r'):
122 if name != target:
123 return opener(name, mode)
118 return appender(opener, name, mode, buf) 124 return appender(opener, name, mode, buf)
119 return o 125 return _delay
120 126
121 class changelog(revlog.revlog): 127 class changelog(revlog.revlog):
122 def __init__(self, opener): 128 def __init__(self, opener):
123 revlog.revlog.__init__(self, opener, "00changelog.i") 129 revlog.revlog.__init__(self, opener, "00changelog.i")
124 if self._initempty: 130 if self._initempty:
125 # changelogs don't benefit from generaldelta 131 # changelogs don't benefit from generaldelta
126 self.version &= ~revlog.REVLOGGENERALDELTA 132 self.version &= ~revlog.REVLOGGENERALDELTA
127 self._generaldelta = False 133 self._generaldelta = False
128 self._realopener = opener 134 self._realopener = opener
129 self._delayed = False 135 self._delayed = False
130 self._delaybuf = [] 136 self._delaybuf = None
131 self._divert = False 137 self._divert = False
132 self.filteredrevs = frozenset() 138 self.filteredrevs = frozenset()
133 139
134 def tip(self): 140 def tip(self):
135 """filtered version of revlog.tip""" 141 """filtered version of revlog.tip"""
218 raise error.FilteredIndexError(rev) 224 raise error.FilteredIndexError(rev)
219 return super(changelog, self).flags(rev) 225 return super(changelog, self).flags(rev)
220 226
221 def delayupdate(self): 227 def delayupdate(self):
222 "delay visibility of index updates to other readers" 228 "delay visibility of index updates to other readers"
229
230 if not self._delayed:
231 if len(self) == 0:
232 self._divert = True
233 if self._realopener.exists(self.indexfile + '.a'):
234 self._realopener.unlink(self.indexfile + '.a')
235 self.opener = _divertopener(self._realopener, self.indexfile)
236 else:
237 self._delaybuf = []
238 self.opener = _delayopener(self._realopener, self.indexfile,
239 self._delaybuf)
223 self._delayed = True 240 self._delayed = True
224 self._divert = (len(self) == 0)
225 self._delaybuf = []
226 self.opener = delayopener(self._realopener, self.indexfile,
227 self._divert, self._delaybuf)
228 241
229 def finalize(self, tr): 242 def finalize(self, tr):
230 "finalize index updates" 243 "finalize index updates"
231 self._delayed = False 244 self._delayed = False
232 self.opener = self._realopener 245 self.opener = self._realopener
233 # move redirected index data back into place 246 # move redirected index data back into place
234 if self._divert: 247 if self._divert:
248 assert not self._delaybuf
235 tmpname = self.indexfile + ".a" 249 tmpname = self.indexfile + ".a"
236 nfile = self.opener.open(tmpname) 250 nfile = self.opener.open(tmpname)
237 nfile.close() 251 nfile.close()
238 self.opener.rename(tmpname, self.indexfile) 252 self.opener.rename(tmpname, self.indexfile)
239 elif self._delaybuf: 253 elif self._delaybuf:
240 fp = self.opener(self.indexfile, 'a') 254 fp = self.opener(self.indexfile, 'a')
241 fp.write("".join(self._delaybuf)) 255 fp.write("".join(self._delaybuf))
242 fp.close() 256 fp.close()
243 self._delaybuf = [] 257 self._delaybuf = None
258 self._divert = False
244 # split when we're done 259 # split when we're done
245 self.checkinlinesize(tr) 260 self.checkinlinesize(tr)
246 261
247 def readpending(self, file): 262 def readpending(self, file):
248 r = revlog.revlog(self.opener, file) 263 r = revlog.revlog(self.opener, file)
260 fp2.write(fp1.read()) 275 fp2.write(fp1.read())
261 # add pending data 276 # add pending data
262 fp2.write("".join(self._delaybuf)) 277 fp2.write("".join(self._delaybuf))
263 fp2.close() 278 fp2.close()
264 # switch modes so finalize can simply rename 279 # switch modes so finalize can simply rename
265 self._delaybuf = [] 280 self._delaybuf = None
266 self._divert = True 281 self._divert = True
282 self.opener = _divertopener(self._realopener, self.indexfile)
267 283
268 if self._divert: 284 if self._divert:
269 return True 285 return True
270 286
271 return False 287 return False