comparison hgext/fsmonitor/__init__.py @ 28433:3b67f27bb908

fsmonitor: new experimental extension Extension to plug into a Watchman daemon, speeding up hg status calls by relying on OS events to tell us what files have changed. Originally developed at https://bitbucket.org/facebook/hgwatchman
author Martijn Pieters <mjpieters@fb.com>
date Thu, 03 Mar 2016 14:29:19 +0000
parents
children 49d65663d7e4
comparison
equal deleted inserted replaced
28432:2377c4ac4eec 28433:3b67f27bb908
1 # __init__.py - fsmonitor initialization and overrides
2 #
3 # Copyright 2013-2016 Facebook, Inc.
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 '''Faster status operations with the Watchman file monitor (EXPERIMENTAL)
9
10 Integrates the file-watching program Watchman with Mercurial to produce faster
11 status results.
12
13 On a particular Linux system, for a real-world repository with over 400,000
14 files hosted on ext4, vanilla `hg status` takes 1.3 seconds. On the same
15 system, with fsmonitor it takes about 0.3 seconds.
16
17 fsmonitor requires no configuration -- it will tell Watchman about your
18 repository as necessary. You'll need to install Watchman from
19 https://facebook.github.io/watchman/ and make sure it is in your PATH.
20
21 The following configuration options exist:
22
23 ::
24
25 [fsmonitor]
26 mode = {off, on, paranoid}
27
28 When `mode = off`, fsmonitor will disable itself (similar to not loading the
29 extension at all). When `mode = on`, fsmonitor will be enabled (the default).
30 When `mode = paranoid`, fsmonitor will query both Watchman and the filesystem,
31 and ensure that the results are consistent.
32
33 ::
34
35 [fsmonitor]
36 timeout = (float)
37
38 A value, in seconds, that determines how long fsmonitor will wait for Watchman
39 to return results. Defaults to `2.0`.
40
41 ::
42
43 [fsmonitor]
44 blacklistusers = (list of userids)
45
46 A list of usernames for which fsmonitor will disable itself altogether.
47
48 ::
49
50 [fsmonitor]
51 walk_on_invalidate = (boolean)
52
53 Whether or not to walk the whole repo ourselves when our cached state has been
54 invalidated, for example when Watchman has been restarted or .hgignore rules
55 have been changed. Walking the repo in that case can result in competing for
56 I/O with Watchman. For large repos it is recommended to set this value to
57 false. You may wish to set this to true if you have a very fast filesystem
58 that can outpace the IPC overhead of getting the result data for the full repo
59 from Watchman. Defaults to false.
60
61 fsmonitor is incompatible with the largefiles and eol extensions, and
62 will disable itself if any of those are active.
63
64 '''
65
66 # Platforms Supported
67 # ===================
68 #
69 # **Linux:** *Stable*. Watchman and fsmonitor are both known to work reliably,
70 # even under severe loads.
71 #
72 # **Mac OS X:** *Stable*. The Mercurial test suite passes with fsmonitor
73 # turned on, on case-insensitive HFS+. There has been a reasonable amount of
74 # user testing under normal loads.
75 #
76 # **Solaris, BSD:** *Alpha*. watchman and fsmonitor are believed to work, but
77 # very little testing has been done.
78 #
79 # **Windows:** *Alpha*. Not in a release version of watchman or fsmonitor yet.
80 #
81 # Known Issues
82 # ============
83 #
84 # * fsmonitor will disable itself if any of the following extensions are
85 # enabled: largefiles, inotify, eol; or if the repository has subrepos.
86 # * fsmonitor will produce incorrect results if nested repos that are not
87 # subrepos exist. *Workaround*: add nested repo paths to your `.hgignore`.
88 #
89 # The issues related to nested repos and subrepos are probably not fundamental
90 # ones. Patches to fix them are welcome.
91
92 from __future__ import absolute_import
93
94 import os
95 import stat
96 import sys
97
98 from mercurial import (
99 context,
100 extensions,
101 localrepo,
102 pathutil,
103 scmutil,
104 util,
105 )
106 from mercurial import match as matchmod
107 from mercurial.i18n import _
108
109 from . import (
110 state,
111 watchmanclient,
112 )
113
114 # Note for extension authors: ONLY specify testedwith = 'internal' for
115 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
116 # be specifying the version(s) of Mercurial they are tested with, or
117 # leave the attribute unspecified.
118 testedwith = 'internal'
119
120 # This extension is incompatible with the following blacklisted extensions
121 # and will disable itself when encountering one of these:
122 _blacklist = ['largefiles', 'eol']
123
124 def _handleunavailable(ui, state, ex):
125 """Exception handler for Watchman interaction exceptions"""
126 if isinstance(ex, watchmanclient.Unavailable):
127 if ex.warn:
128 ui.warn(str(ex) + '\n')
129 if ex.invalidate:
130 state.invalidate()
131 ui.log('fsmonitor', 'Watchman unavailable: %s\n', ex.msg)
132 else:
133 ui.log('fsmonitor', 'Watchman exception: %s\n', ex)
134
135 def _hashignore(ignore):
136 """Calculate hash for ignore patterns and filenames
137
138 If this information changes between Mercurial invocations, we can't
139 rely on Watchman information anymore and have to re-scan the working
140 copy.
141
142 """
143 sha1 = util.sha1()
144 if util.safehasattr(ignore, 'includepat'):
145 sha1.update(ignore.includepat)
146 sha1.update('\0\0')
147 if util.safehasattr(ignore, 'excludepat'):
148 sha1.update(ignore.excludepat)
149 sha1.update('\0\0')
150 if util.safehasattr(ignore, 'patternspat'):
151 sha1.update(ignore.patternspat)
152 sha1.update('\0\0')
153 if util.safehasattr(ignore, '_files'):
154 for f in ignore._files:
155 sha1.update(f)
156 sha1.update('\0')
157 return sha1.hexdigest()
158
159 def overridewalk(orig, self, match, subrepos, unknown, ignored, full=True):
160 '''Replacement for dirstate.walk, hooking into Watchman.
161
162 Whenever full is False, ignored is False, and the Watchman client is
163 available, use Watchman combined with saved state to possibly return only a
164 subset of files.'''
165 def bail():
166 return orig(match, subrepos, unknown, ignored, full=True)
167
168 if full or ignored or not self._watchmanclient.available():
169 return bail()
170 state = self._fsmonitorstate
171 clock, ignorehash, notefiles = state.get()
172 if not clock:
173 if state.walk_on_invalidate:
174 return bail()
175 # Initial NULL clock value, see
176 # https://facebook.github.io/watchman/docs/clockspec.html
177 clock = 'c:0:0'
178 notefiles = []
179
180 def fwarn(f, msg):
181 self._ui.warn('%s: %s\n' % (self.pathto(f), msg))
182 return False
183
184 def badtype(mode):
185 kind = _('unknown')
186 if stat.S_ISCHR(mode):
187 kind = _('character device')
188 elif stat.S_ISBLK(mode):
189 kind = _('block device')
190 elif stat.S_ISFIFO(mode):
191 kind = _('fifo')
192 elif stat.S_ISSOCK(mode):
193 kind = _('socket')
194 elif stat.S_ISDIR(mode):
195 kind = _('directory')
196 return _('unsupported file type (type is %s)') % kind
197
198 ignore = self._ignore
199 dirignore = self._dirignore
200 if unknown:
201 if _hashignore(ignore) != ignorehash and clock != 'c:0:0':
202 # ignore list changed -- can't rely on Watchman state any more
203 if state.walk_on_invalidate:
204 return bail()
205 notefiles = []
206 clock = 'c:0:0'
207 else:
208 # always ignore
209 ignore = util.always
210 dirignore = util.always
211
212 matchfn = match.matchfn
213 matchalways = match.always()
214 dmap = self._map
215 nonnormalset = getattr(self, '_nonnormalset', None)
216
217 copymap = self._copymap
218 getkind = stat.S_IFMT
219 dirkind = stat.S_IFDIR
220 regkind = stat.S_IFREG
221 lnkkind = stat.S_IFLNK
222 join = self._join
223 normcase = util.normcase
224 fresh_instance = False
225
226 exact = skipstep3 = False
227 if matchfn == match.exact: # match.exact
228 exact = True
229 dirignore = util.always # skip step 2
230 elif match.files() and not match.anypats(): # match.match, no patterns
231 skipstep3 = True
232
233 if not exact and self._checkcase:
234 # note that even though we could receive directory entries, we're only
235 # interested in checking if a file with the same name exists. So only
236 # normalize files if possible.
237 normalize = self._normalizefile
238 skipstep3 = False
239 else:
240 normalize = None
241
242 # step 1: find all explicit files
243 results, work, dirsnotfound = self._walkexplicit(match, subrepos)
244
245 skipstep3 = skipstep3 and not (work or dirsnotfound)
246 work = [d for d in work if not dirignore(d[0])]
247
248 if not work and (exact or skipstep3):
249 for s in subrepos:
250 del results[s]
251 del results['.hg']
252 return results
253
254 # step 2: query Watchman
255 try:
256 # Use the user-configured timeout for the query.
257 # Add a little slack over the top of the user query to allow for
258 # overheads while transferring the data
259 self._watchmanclient.settimeout(state.timeout + 0.1)
260 result = self._watchmanclient.command('query', {
261 'fields': ['mode', 'mtime', 'size', 'exists', 'name'],
262 'since': clock,
263 'expression': [
264 'not', [
265 'anyof', ['dirname', '.hg'],
266 ['name', '.hg', 'wholename']
267 ]
268 ],
269 'sync_timeout': int(state.timeout * 1000),
270 'empty_on_fresh_instance': state.walk_on_invalidate,
271 })
272 except Exception as ex:
273 _handleunavailable(self._ui, state, ex)
274 self._watchmanclient.clearconnection()
275 return bail()
276 else:
277 # We need to propagate the last observed clock up so that we
278 # can use it for our next query
279 state.setlastclock(result['clock'])
280 if result['is_fresh_instance']:
281 if state.walk_on_invalidate:
282 state.invalidate()
283 return bail()
284 fresh_instance = True
285 # Ignore any prior noteable files from the state info
286 notefiles = []
287
288 # for file paths which require normalization and we encounter a case
289 # collision, we store our own foldmap
290 if normalize:
291 foldmap = dict((normcase(k), k) for k in results)
292
293 switch_slashes = os.sep == '\\'
294 # The order of the results is, strictly speaking, undefined.
295 # For case changes on a case insensitive filesystem we may receive
296 # two entries, one with exists=True and another with exists=False.
297 # The exists=True entries in the same response should be interpreted
298 # as being happens-after the exists=False entries due to the way that
299 # Watchman tracks files. We use this property to reconcile deletes
300 # for name case changes.
301 for entry in result['files']:
302 fname = entry['name']
303 if switch_slashes:
304 fname = fname.replace('\\', '/')
305 if normalize:
306 normed = normcase(fname)
307 fname = normalize(fname, True, True)
308 foldmap[normed] = fname
309 fmode = entry['mode']
310 fexists = entry['exists']
311 kind = getkind(fmode)
312
313 if not fexists:
314 # if marked as deleted and we don't already have a change
315 # record, mark it as deleted. If we already have an entry
316 # for fname then it was either part of walkexplicit or was
317 # an earlier result that was a case change
318 if fname not in results and fname in dmap and (
319 matchalways or matchfn(fname)):
320 results[fname] = None
321 elif kind == dirkind:
322 if fname in dmap and (matchalways or matchfn(fname)):
323 results[fname] = None
324 elif kind == regkind or kind == lnkkind:
325 if fname in dmap:
326 if matchalways or matchfn(fname):
327 results[fname] = entry
328 elif (matchalways or matchfn(fname)) and not ignore(fname):
329 results[fname] = entry
330 elif fname in dmap and (matchalways or matchfn(fname)):
331 results[fname] = None
332
333 # step 3: query notable files we don't already know about
334 # XXX try not to iterate over the entire dmap
335 if normalize:
336 # any notable files that have changed case will already be handled
337 # above, so just check membership in the foldmap
338 notefiles = set((normalize(f, True, True) for f in notefiles
339 if normcase(f) not in foldmap))
340 visit = set((f for f in notefiles if (f not in results and matchfn(f)
341 and (f in dmap or not ignore(f)))))
342
343 if nonnormalset is not None and not fresh_instance:
344 if matchalways:
345 visit.update(f for f in nonnormalset if f not in results)
346 visit.update(f for f in copymap if f not in results)
347 else:
348 visit.update(f for f in nonnormalset
349 if f not in results and matchfn(f))
350 visit.update(f for f in copymap
351 if f not in results and matchfn(f))
352 else:
353 if matchalways:
354 visit.update(f for f, st in dmap.iteritems()
355 if (f not in results and
356 (st[2] < 0 or st[0] != 'n' or fresh_instance)))
357 visit.update(f for f in copymap if f not in results)
358 else:
359 visit.update(f for f, st in dmap.iteritems()
360 if (f not in results and
361 (st[2] < 0 or st[0] != 'n' or fresh_instance)
362 and matchfn(f)))
363 visit.update(f for f in copymap
364 if f not in results and matchfn(f))
365
366 audit = pathutil.pathauditor(self._root).check
367 auditpass = [f for f in visit if audit(f)]
368 auditpass.sort()
369 auditfail = visit.difference(auditpass)
370 for f in auditfail:
371 results[f] = None
372
373 nf = iter(auditpass).next
374 for st in util.statfiles([join(f) for f in auditpass]):
375 f = nf()
376 if st or f in dmap:
377 results[f] = st
378
379 for s in subrepos:
380 del results[s]
381 del results['.hg']
382 return results
383
384 def overridestatus(
385 orig, self, node1='.', node2=None, match=None, ignored=False,
386 clean=False, unknown=False, listsubrepos=False):
387 listignored = ignored
388 listclean = clean
389 listunknown = unknown
390
391 def _cmpsets(l1, l2):
392 try:
393 if 'FSMONITOR_LOG_FILE' in os.environ:
394 fn = os.environ['FSMONITOR_LOG_FILE']
395 f = open(fn, 'wb')
396 else:
397 fn = 'fsmonitorfail.log'
398 f = self.opener(fn, 'wb')
399 except (IOError, OSError):
400 self.ui.warn(_('warning: unable to write to %s\n') % fn)
401 return
402
403 try:
404 for i, (s1, s2) in enumerate(zip(l1, l2)):
405 if set(s1) != set(s2):
406 f.write('sets at position %d are unequal\n' % i)
407 f.write('watchman returned: %s\n' % s1)
408 f.write('stat returned: %s\n' % s2)
409 finally:
410 f.close()
411
412 if isinstance(node1, context.changectx):
413 ctx1 = node1
414 else:
415 ctx1 = self[node1]
416 if isinstance(node2, context.changectx):
417 ctx2 = node2
418 else:
419 ctx2 = self[node2]
420
421 working = ctx2.rev() is None
422 parentworking = working and ctx1 == self['.']
423 match = match or matchmod.always(self.root, self.getcwd())
424
425 # Maybe we can use this opportunity to update Watchman's state.
426 # Mercurial uses workingcommitctx and/or memctx to represent the part of
427 # the workingctx that is to be committed. So don't update the state in
428 # that case.
429 # HG_PENDING is set in the environment when the dirstate is being updated
430 # in the middle of a transaction; we must not update our state in that
431 # case, or we risk forgetting about changes in the working copy.
432 updatestate = (parentworking and match.always() and
433 not isinstance(ctx2, (context.workingcommitctx,
434 context.memctx)) and
435 'HG_PENDING' not in os.environ)
436
437 try:
438 if self._fsmonitorstate.walk_on_invalidate:
439 # Use a short timeout to query the current clock. If that
440 # takes too long then we assume that the service will be slow
441 # to answer our query.
442 # walk_on_invalidate indicates that we prefer to walk the
443 # tree ourselves because we can ignore portions that Watchman
444 # cannot and we tend to be faster in the warmer buffer cache
445 # cases.
446 self._watchmanclient.settimeout(0.1)
447 else:
448 # Give Watchman more time to potentially complete its walk
449 # and return the initial clock. In this mode we assume that
450 # the filesystem will be slower than parsing a potentially
451 # very large Watchman result set.
452 self._watchmanclient.settimeout(
453 self._fsmonitorstate.timeout + 0.1)
454 startclock = self._watchmanclient.getcurrentclock()
455 except Exception as ex:
456 self._watchmanclient.clearconnection()
457 _handleunavailable(self.ui, self._fsmonitorstate, ex)
458 # boo, Watchman failed. bail
459 return orig(node1, node2, match, listignored, listclean,
460 listunknown, listsubrepos)
461
462 if updatestate:
463 # We need info about unknown files. This may make things slower the
464 # first time, but whatever.
465 stateunknown = True
466 else:
467 stateunknown = listunknown
468
469 r = orig(node1, node2, match, listignored, listclean, stateunknown,
470 listsubrepos)
471 modified, added, removed, deleted, unknown, ignored, clean = r
472
473 if updatestate:
474 notefiles = modified + added + removed + deleted + unknown
475 self._fsmonitorstate.set(
476 self._fsmonitorstate.getlastclock() or startclock,
477 _hashignore(self.dirstate._ignore),
478 notefiles)
479
480 if not listunknown:
481 unknown = []
482
483 # don't do paranoid checks if we're not going to query Watchman anyway
484 full = listclean or match.traversedir is not None
485 if self._fsmonitorstate.mode == 'paranoid' and not full:
486 # run status again and fall back to the old walk this time
487 self.dirstate._fsmonitordisable = True
488
489 # shut the UI up
490 quiet = self.ui.quiet
491 self.ui.quiet = True
492 fout, ferr = self.ui.fout, self.ui.ferr
493 self.ui.fout = self.ui.ferr = open(os.devnull, 'wb')
494
495 try:
496 rv2 = orig(
497 node1, node2, match, listignored, listclean, listunknown,
498 listsubrepos)
499 finally:
500 self.dirstate._fsmonitordisable = False
501 self.ui.quiet = quiet
502 self.ui.fout, self.ui.ferr = fout, ferr
503
504 # clean isn't tested since it's set to True above
505 _cmpsets([modified, added, removed, deleted, unknown, ignored, clean],
506 rv2)
507 modified, added, removed, deleted, unknown, ignored, clean = rv2
508
509 return scmutil.status(
510 modified, added, removed, deleted, unknown, ignored, clean)
511
512 def makedirstate(cls):
513 class fsmonitordirstate(cls):
514 def _fsmonitorinit(self, fsmonitorstate, watchmanclient):
515 # _fsmonitordisable is used in paranoid mode
516 self._fsmonitordisable = False
517 self._fsmonitorstate = fsmonitorstate
518 self._watchmanclient = watchmanclient
519
520 def walk(self, *args, **kwargs):
521 orig = super(fsmonitordirstate, self).walk
522 if self._fsmonitordisable:
523 return orig(*args, **kwargs)
524 return overridewalk(orig, self, *args, **kwargs)
525
526 def rebuild(self, *args, **kwargs):
527 self._fsmonitorstate.invalidate()
528 return super(fsmonitordirstate, self).rebuild(*args, **kwargs)
529
530 def invalidate(self, *args, **kwargs):
531 self._fsmonitorstate.invalidate()
532 return super(fsmonitordirstate, self).invalidate(*args, **kwargs)
533
534 return fsmonitordirstate
535
536 def wrapdirstate(orig, self):
537 ds = orig(self)
538 # only override the dirstate when Watchman is available for the repo
539 if util.safehasattr(self, '_fsmonitorstate'):
540 ds.__class__ = makedirstate(ds.__class__)
541 ds._fsmonitorinit(self._fsmonitorstate, self._watchmanclient)
542 return ds
543
544 def extsetup(ui):
545 wrapfilecache(localrepo.localrepository, 'dirstate', wrapdirstate)
546 if sys.platform == 'darwin':
547 # An assist for avoiding the dangling-symlink fsevents bug
548 extensions.wrapfunction(os, 'symlink', wrapsymlink)
549
550 def wrapsymlink(orig, source, link_name):
551 ''' if we create a dangling symlink, also touch the parent dir
552 to encourage fsevents notifications to work more correctly '''
553 try:
554 return orig(source, link_name)
555 finally:
556 try:
557 os.utime(os.path.dirname(link_name), None)
558 except OSError:
559 pass
560
561 def reposetup(ui, repo):
562 # We don't work with largefiles or inotify
563 exts = extensions.enabled()
564 for ext in _blacklist:
565 if ext in exts:
566 ui.warn(_('The fsmonitor extension is incompatible with the %s '
567 'extension and has been disabled.\n') % ext)
568 return
569
570 if util.safehasattr(repo, 'dirstate'):
571 # We don't work with subrepos either. Note that we can get passed in
572 # e.g. a statichttprepo, which throws on trying to access the substate.
573 # XXX This sucks.
574 try:
575 # if repo[None].substate can cause a dirstate parse, which is too
576 # slow. Instead, look for a file called hgsubstate,
577 if repo.wvfs.exists('.hgsubstate') or repo.wvfs.exists('.hgsub'):
578 return
579 except AttributeError:
580 return
581
582 fsmonitorstate = state.state(repo)
583 if fsmonitorstate.mode == 'off':
584 return
585
586 try:
587 client = watchmanclient.client(repo)
588 except Exception as ex:
589 _handleunavailable(ui, fsmonitorstate, ex)
590 return
591
592 repo._fsmonitorstate = fsmonitorstate
593 repo._watchmanclient = client
594
595 # at this point since fsmonitorstate wasn't present, repo.dirstate is
596 # not a fsmonitordirstate
597 repo.dirstate.__class__ = makedirstate(repo.dirstate.__class__)
598 # nuke the dirstate so that _fsmonitorinit and subsequent configuration
599 # changes take effect on it
600 del repo._filecache['dirstate']
601 delattr(repo.unfiltered(), 'dirstate')
602
603 class fsmonitorrepo(repo.__class__):
604 def status(self, *args, **kwargs):
605 orig = super(fsmonitorrepo, self).status
606 return overridestatus(orig, self, *args, **kwargs)
607
608 repo.__class__ = fsmonitorrepo
609
610 def wrapfilecache(cls, propname, wrapper):
611 """Wraps a filecache property. These can't be wrapped using the normal
612 wrapfunction. This should eventually go into upstream Mercurial.
613 """
614 assert callable(wrapper)
615 for currcls in cls.__mro__:
616 if propname in currcls.__dict__:
617 origfn = currcls.__dict__[propname].func
618 assert callable(origfn)
619 def wrap(*args, **kwargs):
620 return wrapper(origfn, *args, **kwargs)
621 currcls.__dict__[propname].func = wrap
622 break
623
624 if currcls is object:
625 raise AttributeError(
626 _("type '%s' has no property '%s'") % (cls, propname))