comparison hgext/journal.py @ 29503:0103b673d6ca

journal: add share extension support Rather than put everything into one journal file, split entries up in *shared* and *local* entries. Working copy changes are local to a specific working copy, so should remain local only. Other entries are shared with the source if so configured when the share was created. When unsharing, any shared journale entries are copied across.
author Martijn Pieters <mjpieters@fb.com>
date Mon, 11 Jul 2016 14:45:41 +0100
parents 8361131b4768
children 7503d8874617
comparison
equal deleted inserted replaced
29502:8361131b4768 29503:0103b673d6ca
12 """ 12 """
13 13
14 from __future__ import absolute_import 14 from __future__ import absolute_import
15 15
16 import collections 16 import collections
17 import errno
17 import os 18 import os
18 import weakref 19 import weakref
19 20
20 from mercurial.i18n import _ 21 from mercurial.i18n import _
21 22
25 commands, 26 commands,
26 dirstate, 27 dirstate,
27 dispatch, 28 dispatch,
28 error, 29 error,
29 extensions, 30 extensions,
31 hg,
30 localrepo, 32 localrepo,
31 lock, 33 lock,
32 node, 34 node,
33 util, 35 util,
34 ) 36 )
35 37
38 from . import share
39
36 cmdtable = {} 40 cmdtable = {}
37 command = cmdutil.command(cmdtable) 41 command = cmdutil.command(cmdtable)
38 42
39 # Note for extension authors: ONLY specify testedwith = 'internal' for 43 # Note for extension authors: ONLY specify testedwith = 'internal' for
40 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should 44 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
46 storageversion = 0 50 storageversion = 0
47 51
48 # namespaces 52 # namespaces
49 bookmarktype = 'bookmark' 53 bookmarktype = 'bookmark'
50 wdirparenttype = 'wdirparent' 54 wdirparenttype = 'wdirparent'
55 # In a shared repository, what shared feature name is used
56 # to indicate this namespace is shared with the source?
57 sharednamespaces = {
58 bookmarktype: hg.sharedbookmarks,
59 }
51 60
52 # Journal recording, register hooks and storage object 61 # Journal recording, register hooks and storage object
53 def extsetup(ui): 62 def extsetup(ui):
54 extensions.wrapfunction(dispatch, 'runcommand', runcommand) 63 extensions.wrapfunction(dispatch, 'runcommand', runcommand)
55 extensions.wrapfunction(bookmarks.bmstore, '_write', recordbookmarks) 64 extensions.wrapfunction(bookmarks.bmstore, '_write', recordbookmarks)
56 extensions.wrapfunction( 65 extensions.wrapfunction(
57 dirstate.dirstate, '_writedirstate', recorddirstateparents) 66 dirstate.dirstate, '_writedirstate', recorddirstateparents)
58 extensions.wrapfunction( 67 extensions.wrapfunction(
59 localrepo.localrepository.dirstate, 'func', wrapdirstate) 68 localrepo.localrepository.dirstate, 'func', wrapdirstate)
69 extensions.wrapfunction(hg, 'postshare', wrappostshare)
70 extensions.wrapfunction(hg, 'copystore', unsharejournal)
60 71
61 def reposetup(ui, repo): 72 def reposetup(ui, repo):
62 if repo.local(): 73 if repo.local():
63 repo.journal = journalstorage(repo) 74 repo.journal = journalstorage(repo)
64 75
112 oldvalue = oldmarks.get(mark, node.nullid) 123 oldvalue = oldmarks.get(mark, node.nullid)
113 if value != oldvalue: 124 if value != oldvalue:
114 repo.journal.record(bookmarktype, mark, oldvalue, value) 125 repo.journal.record(bookmarktype, mark, oldvalue, value)
115 return orig(store, fp) 126 return orig(store, fp)
116 127
128 # shared repository support
129 def _readsharedfeatures(repo):
130 """A set of shared features for this repository"""
131 try:
132 return set(repo.vfs.read('shared').splitlines())
133 except IOError as inst:
134 if inst.errno != errno.ENOENT:
135 raise
136 return set()
137
138 def _mergeentriesiter(*iterables, **kwargs):
139 """Given a set of sorted iterables, yield the next entry in merged order
140
141 Note that by default entries go from most recent to oldest.
142 """
143 order = kwargs.pop('order', max)
144 iterables = [iter(it) for it in iterables]
145 # this tracks still active iterables; iterables are deleted as they are
146 # exhausted, which is why this is a dictionary and why each entry also
147 # stores the key. Entries are mutable so we can store the next value each
148 # time.
149 iterable_map = {}
150 for key, it in enumerate(iterables):
151 try:
152 iterable_map[key] = [next(it), key, it]
153 except StopIteration:
154 # empty entry, can be ignored
155 pass
156
157 while iterable_map:
158 value, key, it = order(iterable_map.itervalues())
159 yield value
160 try:
161 iterable_map[key][0] = next(it)
162 except StopIteration:
163 # this iterable is empty, remove it from consideration
164 del iterable_map[key]
165
166 def wrappostshare(orig, sourcerepo, destrepo, **kwargs):
167 """Mark this shared working copy as sharing journal information"""
168 orig(sourcerepo, destrepo, **kwargs)
169 with destrepo.vfs('shared', 'a') as fp:
170 fp.write('journal\n')
171
172 def unsharejournal(orig, ui, repo, repopath):
173 """Copy shared journal entries into this repo when unsharing"""
174 if (repo.path == repopath and repo.shared() and
175 util.safehasattr(repo, 'journal')):
176 sharedrepo = share._getsrcrepo(repo)
177 sharedfeatures = _readsharedfeatures(repo)
178 if sharedrepo and sharedfeatures > set(['journal']):
179 # there is a shared repository and there are shared journal entries
180 # to copy. move shared date over from source to destination but
181 # move the local file first
182 if repo.vfs.exists('journal'):
183 journalpath = repo.join('journal')
184 util.rename(journalpath, journalpath + '.bak')
185 storage = repo.journal
186 local = storage._open(
187 repo.vfs, filename='journal.bak', _newestfirst=False)
188 shared = (
189 e for e in storage._open(sharedrepo.vfs, _newestfirst=False)
190 if sharednamespaces.get(e.namespace) in sharedfeatures)
191 for entry in _mergeentriesiter(local, shared, order=min):
192 storage._write(repo.vfs, entry)
193
194 return orig(ui, repo, repopath)
195
117 class journalentry(collections.namedtuple( 196 class journalentry(collections.namedtuple(
118 'journalentry', 197 'journalentry',
119 'timestamp user command namespace name oldhashes newhashes')): 198 'timestamp user command namespace name oldhashes newhashes')):
120 """Individual journal entry 199 """Individual journal entry
121 200
155 oldhashes, newhashes)) 234 oldhashes, newhashes))
156 235
157 class journalstorage(object): 236 class journalstorage(object):
158 """Storage for journal entries 237 """Storage for journal entries
159 238
239 Entries are divided over two files; one with entries that pertain to the
240 local working copy *only*, and one with entries that are shared across
241 multiple working copies when shared using the share extension.
242
160 Entries are stored with NUL bytes as separators. See the journalentry 243 Entries are stored with NUL bytes as separators. See the journalentry
161 class for the per-entry structure. 244 class for the per-entry structure.
162 245
163 The file format starts with an integer version, delimited by a NUL. 246 The file format starts with an integer version, delimited by a NUL.
164 247
172 255
173 def __init__(self, repo): 256 def __init__(self, repo):
174 self.user = util.getuser() 257 self.user = util.getuser()
175 self.ui = repo.ui 258 self.ui = repo.ui
176 self.vfs = repo.vfs 259 self.vfs = repo.vfs
260
261 # is this working copy using a shared storage?
262 self.sharedfeatures = self.sharedvfs = None
263 if repo.shared():
264 features = _readsharedfeatures(repo)
265 sharedrepo = share._getsrcrepo(repo)
266 if sharedrepo is not None and 'journal' in features:
267 self.sharedvfs = sharedrepo.vfs
268 self.sharedfeatures = features
177 269
178 # track the current command for recording in journal entries 270 # track the current command for recording in journal entries
179 @property 271 @property
180 def command(self): 272 def command(self):
181 commandstr = ' '.join( 273 commandstr = ' '.join(
190 """Set the current hg arguments, stored with recorded entries""" 282 """Set the current hg arguments, stored with recorded entries"""
191 # Set the current command on the class because we may have started 283 # Set the current command on the class because we may have started
192 # with a non-local repo (cloning for example). 284 # with a non-local repo (cloning for example).
193 cls._currentcommand = fullargs 285 cls._currentcommand = fullargs
194 286
195 def jlock(self): 287 def jlock(self, vfs):
196 """Create a lock for the journal file""" 288 """Create a lock for the journal file"""
197 if self._lockref and self._lockref(): 289 if self._lockref and self._lockref():
198 raise error.Abort(_('journal lock does not support nesting')) 290 raise error.Abort(_('journal lock does not support nesting'))
199 desc = _('journal of %s') % self.vfs.base 291 desc = _('journal of %s') % vfs.base
200 try: 292 try:
201 l = lock.lock(self.vfs, 'journal.lock', 0, desc=desc) 293 l = lock.lock(vfs, 'journal.lock', 0, desc=desc)
202 except error.LockHeld as inst: 294 except error.LockHeld as inst:
203 self.ui.warn( 295 self.ui.warn(
204 _("waiting for lock on %s held by %r\n") % (desc, inst.locker)) 296 _("waiting for lock on %s held by %r\n") % (desc, inst.locker))
205 # default to 600 seconds timeout 297 # default to 600 seconds timeout
206 l = lock.lock( 298 l = lock.lock(
207 self.vfs, 'journal.lock', 299 vfs, 'journal.lock',
208 int(self.ui.config("ui", "timeout", "600")), desc=desc) 300 int(self.ui.config("ui", "timeout", "600")), desc=desc)
209 self.ui.warn(_("got lock after %s seconds\n") % l.delay) 301 self.ui.warn(_("got lock after %s seconds\n") % l.delay)
210 self._lockref = weakref.ref(l) 302 self._lockref = weakref.ref(l)
211 return l 303 return l
212 304
229 321
230 entry = journalentry( 322 entry = journalentry(
231 util.makedate(), self.user, self.command, namespace, name, 323 util.makedate(), self.user, self.command, namespace, name,
232 oldhashes, newhashes) 324 oldhashes, newhashes)
233 325
234 with self.jlock(): 326 vfs = self.vfs
327 if self.sharedvfs is not None:
328 # write to the shared repository if this feature is being
329 # shared between working copies.
330 if sharednamespaces.get(namespace) in self.sharedfeatures:
331 vfs = self.sharedvfs
332
333 self._write(vfs, entry)
334
335 def _write(self, vfs, entry):
336 with self.jlock(vfs):
235 version = None 337 version = None
236 # open file in amend mode to ensure it is created if missing 338 # open file in amend mode to ensure it is created if missing
237 with self.vfs('journal', mode='a+b', atomictemp=True) as f: 339 with vfs('journal', mode='a+b', atomictemp=True) as f:
238 f.seek(0, os.SEEK_SET) 340 f.seek(0, os.SEEK_SET)
239 # Read just enough bytes to get a version number (up to 2 341 # Read just enough bytes to get a version number (up to 2
240 # digits plus separator) 342 # digits plus separator)
241 version = f.read(3).partition('\0')[0] 343 version = f.read(3).partition('\0')[0]
242 if version and version != str(storageversion): 344 if version and version != str(storageversion):
271 """Iterate over the storage 373 """Iterate over the storage
272 374
273 Yields journalentry instances for each contained journal record. 375 Yields journalentry instances for each contained journal record.
274 376
275 """ 377 """
276 if not self.vfs.exists('journal'): 378 local = self._open(self.vfs)
379
380 if self.sharedvfs is None:
381 return local
382
383 # iterate over both local and shared entries, but only those
384 # shared entries that are among the currently shared features
385 shared = (
386 e for e in self._open(self.sharedvfs)
387 if sharednamespaces.get(e.namespace) in self.sharedfeatures)
388 return _mergeentriesiter(local, shared)
389
390 def _open(self, vfs, filename='journal', _newestfirst=True):
391 if not vfs.exists(filename):
277 return 392 return
278 393
279 with self.vfs('journal') as f: 394 with vfs(filename) as f:
280 raw = f.read() 395 raw = f.read()
281 396
282 lines = raw.split('\0') 397 lines = raw.split('\0')
283 version = lines and lines[0] 398 version = lines and lines[0]
284 if version != str(storageversion): 399 if version != str(storageversion):
285 version = version or _('not available') 400 version = version or _('not available')
286 raise error.Abort(_("unknown journal file version '%s'") % version) 401 raise error.Abort(_("unknown journal file version '%s'") % version)
287 402
288 # Skip the first line, it's a version number. Reverse the rest. 403 # Skip the first line, it's a version number. Normally we iterate over
289 lines = reversed(lines[1:]) 404 # these in reverse order to list newest first; only when copying across
405 # a shared storage do we forgo reversing.
406 lines = lines[1:]
407 if _newestfirst:
408 lines = reversed(lines)
290 for line in lines: 409 for line in lines:
291 if not line: 410 if not line:
292 continue 411 continue
293 yield journalentry.fromstorage(line) 412 yield journalentry.fromstorage(line)
294 413