comparison hgext/inotify/server.py @ 6239:39cfcef4f463

Add inotify extension
author Bryan O'Sullivan <bos@serpentine.com>
date Wed, 12 Mar 2008 15:30:11 -0700
parents
children c86207d41512
comparison
equal deleted inserted replaced
6236:ad6b123de1c7 6239:39cfcef4f463
1 # server.py - inotify status server
2 #
3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
5 #
6 # This software may be used and distributed according to the terms
7 # of the GNU General Public License, incorporated herein by reference.
8
9 from mercurial.i18n import gettext as _
10 from mercurial import osutil, ui, util
11 import common
12 import errno, os, select, socket, stat, struct, sys, time
13
14 try:
15 import hgext.inotify.linux as inotify
16 from hgext.inotify.linux import watcher
17 except ImportError:
18 print >> sys.stderr, '*** native support is required for this extension'
19 raise
20
21 class AlreadyStartedException(Exception): pass
22
23 def join(a, b):
24 if a:
25 if a[-1] == '/':
26 return a + b
27 return a + '/' + b
28 return b
29
30 walk_ignored_errors = (errno.ENOENT, errno.ENAMETOOLONG)
31
32 def walkrepodirs(repo):
33 '''Iterate over all subdirectories of this repo.
34 Exclude the .hg directory, any nested repos, and ignored dirs.'''
35 rootslash = repo.root + os.sep
36 def walkit(dirname, top):
37 hginside = False
38 try:
39 for name, kind in osutil.listdir(rootslash + dirname):
40 if kind == stat.S_IFDIR:
41 if name == '.hg':
42 hginside = True
43 if not top: break
44 else:
45 d = join(dirname, name)
46 if repo.dirstate._ignore(d):
47 continue
48 for subdir, hginsub in walkit(d, False):
49 if not hginsub:
50 yield subdir, False
51 except OSError, err:
52 if err.errno not in walk_ignored_errors:
53 raise
54 yield rootslash + dirname, hginside
55 for dirname, hginside in walkit('', True):
56 yield dirname
57
58 def walk(repo, root):
59 '''Like os.walk, but only yields regular files.'''
60
61 # This function is critical to performance during startup.
62
63 reporoot = root == ''
64 rootslash = repo.root + os.sep
65
66 def walkit(root, reporoot):
67 files, dirs = [], []
68 hginside = False
69
70 try:
71 fullpath = rootslash + root
72 for name, kind in osutil.listdir(fullpath):
73 if kind == stat.S_IFDIR:
74 if name == '.hg':
75 hginside = True
76 if reporoot:
77 continue
78 else:
79 break
80 dirs.append(name)
81 elif kind in (stat.S_IFREG, stat.S_IFLNK):
82 path = join(root, name)
83 files.append((name, kind))
84
85 yield hginside, fullpath, dirs, files
86
87 for subdir in dirs:
88 path = join(root, subdir)
89 if repo.dirstate._ignore(path):
90 continue
91 for result in walkit(path, False):
92 if not result[0]:
93 yield result
94 except OSError, err:
95 if err.errno not in walk_ignored_errors:
96 raise
97 for result in walkit(root, reporoot):
98 yield result[1:]
99
100 def _explain_watch_limit(ui, repo, count):
101 path = '/proc/sys/fs/inotify/max_user_watches'
102 try:
103 limit = int(file(path).read())
104 except IOError, err:
105 if err.errno != errno.ENOENT:
106 raise
107 raise util.Abort(_('this system does not seem to '
108 'support inotify'))
109 ui.warn(_('*** the current per-user limit on the number '
110 'of inotify watches is %s\n') % limit)
111 ui.warn(_('*** this limit is too low to watch every '
112 'directory in this repository\n'))
113 ui.warn(_('*** counting directories: '))
114 ndirs = len(list(walkrepodirs(repo)))
115 ui.warn(_('found %d\n') % ndirs)
116 newlimit = min(limit, 1024)
117 while newlimit < ((limit + ndirs) * 1.1):
118 newlimit *= 2
119 ui.warn(_('*** to raise the limit from %d to %d (run as root):\n') %
120 (limit, newlimit))
121 ui.warn(_('*** echo %d > %s\n') % (newlimit, path))
122 raise util.Abort(_('cannot watch %s until inotify watch limit is raised')
123 % repo.root)
124
125 class Watcher(object):
126 poll_events = select.POLLIN
127 statuskeys = 'almr!?'
128
129 def __init__(self, ui, repo, master):
130 self.ui = ui
131 self.repo = repo
132 self.wprefix = self.repo.wjoin('')
133 self.timeout = None
134 self.master = master
135 self.mask = (
136 inotify.IN_ATTRIB |
137 inotify.IN_CREATE |
138 inotify.IN_DELETE |
139 inotify.IN_DELETE_SELF |
140 inotify.IN_MODIFY |
141 inotify.IN_MOVED_FROM |
142 inotify.IN_MOVED_TO |
143 inotify.IN_MOVE_SELF |
144 inotify.IN_ONLYDIR |
145 inotify.IN_UNMOUNT |
146 0)
147 try:
148 self.watcher = watcher.Watcher()
149 except OSError, err:
150 raise util.Abort(_('inotify service not available: %s') %
151 err.strerror)
152 self.threshold = watcher.Threshold(self.watcher)
153 self.registered = True
154 self.fileno = self.watcher.fileno
155
156 self.repo.dirstate.__class__.inotifyserver = True
157
158 self.tree = {}
159 self.statcache = {}
160 self.statustrees = dict([(s, {}) for s in self.statuskeys])
161
162 self.watches = 0
163 self.last_event = None
164
165 self.eventq = {}
166 self.deferred = 0
167
168 self.ds_info = self.dirstate_info()
169 self.scan()
170
171 def event_time(self):
172 last = self.last_event
173 now = time.time()
174 self.last_event = now
175
176 if last is None:
177 return 'start'
178 delta = now - last
179 if delta < 5:
180 return '+%.3f' % delta
181 if delta < 50:
182 return '+%.2f' % delta
183 return '+%.1f' % delta
184
185 def dirstate_info(self):
186 try:
187 st = os.lstat(self.repo.join('dirstate'))
188 return st.st_mtime, st.st_ino
189 except OSError, err:
190 if err.errno != errno.ENOENT:
191 raise
192 return 0, 0
193
194 def add_watch(self, path, mask):
195 if not path:
196 return
197 if self.watcher.path(path) is None:
198 if self.ui.debugflag:
199 self.ui.note(_('watching %r\n') % path[len(self.wprefix):])
200 try:
201 self.watcher.add(path, mask)
202 self.watches += 1
203 except OSError, err:
204 if err.errno in (errno.ENOENT, errno.ENOTDIR):
205 return
206 if err.errno != errno.ENOSPC:
207 raise
208 _explain_watch_limit(self.ui, self.repo, self.watches)
209
210 def setup(self):
211 self.ui.note(_('watching directories under %r\n') % self.repo.root)
212 self.add_watch(self.repo.path, inotify.IN_DELETE)
213 self.check_dirstate()
214
215 def wpath(self, evt):
216 path = evt.fullpath
217 if path == self.repo.root:
218 return ''
219 if path.startswith(self.wprefix):
220 return path[len(self.wprefix):]
221 raise 'wtf? ' + path
222
223 def dir(self, tree, path):
224 if path:
225 for name in path.split('/'):
226 tree.setdefault(name, {})
227 tree = tree[name]
228 return tree
229
230 def lookup(self, path, tree):
231 if path:
232 try:
233 for name in path.split('/'):
234 tree = tree[name]
235 except KeyError:
236 return 'x'
237 except TypeError:
238 return 'd'
239 return tree
240
241 def split(self, path):
242 c = path.rfind('/')
243 if c == -1:
244 return '', path
245 return path[:c], path[c+1:]
246
247 def filestatus(self, fn, st):
248 try:
249 type_, mode, size, time = self.repo.dirstate._map[fn][:4]
250 except KeyError:
251 type_ = '?'
252 if type_ == 'n':
253 if not st:
254 return '!'
255 st_mode, st_size, st_mtime = st
256 if size and (size != st_size or (mode ^ st_mode) & 0100):
257 return 'm'
258 if time != int(st_mtime):
259 return 'l'
260 return 'n'
261 if type_ in 'ma' and not st:
262 return '!'
263 if type_ == '?' and self.repo.dirstate._ignore(fn):
264 return 'i'
265 return type_
266
267 def updatestatus(self, wfn, st=None, status=None, oldstatus=None):
268 if st:
269 status = self.filestatus(wfn, st)
270 else:
271 self.statcache.pop(wfn, None)
272 root, fn = self.split(wfn)
273 d = self.dir(self.tree, root)
274 if oldstatus is None:
275 oldstatus = d.get(fn)
276 isdir = False
277 if oldstatus:
278 try:
279 if not status:
280 if oldstatus in 'almn':
281 status = '!'
282 elif oldstatus == 'r':
283 status = 'r'
284 except TypeError:
285 # oldstatus may be a dict left behind by a deleted
286 # directory
287 isdir = True
288 else:
289 if oldstatus in self.statuskeys and oldstatus != status:
290 del self.dir(self.statustrees[oldstatus], root)[fn]
291 if self.ui.debugflag and oldstatus != status:
292 if isdir:
293 self.ui.note('status: %r dir(%d) -> %s\n' %
294 (wfn, len(oldstatus), status))
295 else:
296 self.ui.note('status: %r %s -> %s\n' %
297 (wfn, oldstatus, status))
298 if not isdir:
299 if status and status != 'i':
300 d[fn] = status
301 if status in self.statuskeys:
302 dd = self.dir(self.statustrees[status], root)
303 if oldstatus != status or fn not in dd:
304 dd[fn] = status
305 else:
306 d.pop(fn, None)
307
308 def check_deleted(self, key):
309 # Files that had been deleted but were present in the dirstate
310 # may have vanished from the dirstate; we must clean them up.
311 nuke = []
312 for wfn, ignore in self.walk(key, self.statustrees[key]):
313 if wfn not in self.repo.dirstate:
314 nuke.append(wfn)
315 for wfn in nuke:
316 root, fn = self.split(wfn)
317 del self.dir(self.statustrees[key], root)[fn]
318 del self.dir(self.tree, root)[fn]
319
320 def scan(self, topdir=''):
321 self.handle_timeout()
322 ds = self.repo.dirstate._map.copy()
323 self.add_watch(join(self.repo.root, topdir), self.mask)
324 for root, dirs, entries in walk(self.repo, topdir):
325 for d in dirs:
326 self.add_watch(join(root, d), self.mask)
327 wroot = root[len(self.wprefix):]
328 d = self.dir(self.tree, wroot)
329 for fn, kind in entries:
330 wfn = join(wroot, fn)
331 self.updatestatus(wfn, self.getstat(wfn))
332 ds.pop(wfn, None)
333 wtopdir = topdir
334 if wtopdir and wtopdir[-1] != '/':
335 wtopdir += '/'
336 for wfn, state in ds.iteritems():
337 if not wfn.startswith(wtopdir):
338 continue
339 status = state[0]
340 st = self.getstat(wfn)
341 if status == 'r' and not st:
342 self.updatestatus(wfn, st, status=status)
343 else:
344 self.updatestatus(wfn, st, oldstatus=status)
345 self.check_deleted('!')
346 self.check_deleted('r')
347
348 def check_dirstate(self):
349 ds_info = self.dirstate_info()
350 if ds_info == self.ds_info:
351 return
352 self.ds_info = ds_info
353 if not self.ui.debugflag:
354 self.last_event = None
355 self.ui.note(_('%s dirstate reload\n') % self.event_time())
356 self.repo.dirstate.invalidate()
357 self.scan()
358 self.ui.note(_('%s end dirstate reload\n') % self.event_time())
359
360 def walk(self, states, tree, prefix=''):
361 # This is the "inner loop" when talking to the client.
362
363 for name, val in tree.iteritems():
364 path = join(prefix, name)
365 try:
366 if val in states:
367 yield path, val
368 except TypeError:
369 for p in self.walk(states, val, path):
370 yield p
371
372 def update_hgignore(self):
373 # An update of the ignore file can potentially change the
374 # states of all unknown and ignored files.
375
376 # XXX If the user has other ignore files outside the repo, or
377 # changes their list of ignore files at run time, we'll
378 # potentially never see changes to them. We could get the
379 # client to report to us what ignore data they're using.
380 # But it's easier to do nothing than to open that can of
381 # worms.
382
383 if self.repo.dirstate.ignorefunc is not None:
384 self.repo.dirstate.ignorefunc = None
385 self.ui.note('rescanning due to .hgignore change\n')
386 self.scan()
387
388 def getstat(self, wpath):
389 try:
390 return self.statcache[wpath]
391 except KeyError:
392 try:
393 return self.stat(wpath)
394 except OSError, err:
395 if err.errno != errno.ENOENT:
396 raise
397
398 def stat(self, wpath):
399 try:
400 st = os.lstat(join(self.wprefix, wpath))
401 ret = st.st_mode, st.st_size, st.st_mtime
402 self.statcache[wpath] = ret
403 return ret
404 except OSError, err:
405 self.statcache.pop(wpath, None)
406 raise
407
408 def created(self, wpath):
409 if wpath == '.hgignore':
410 self.update_hgignore()
411 try:
412 st = self.stat(wpath)
413 if stat.S_ISREG(st[0]):
414 self.updatestatus(wpath, st)
415 except OSError, err:
416 pass
417
418 def modified(self, wpath):
419 if wpath == '.hgignore':
420 self.update_hgignore()
421 try:
422 st = self.stat(wpath)
423 if stat.S_ISREG(st[0]):
424 if self.repo.dirstate[wpath] in 'lmn':
425 self.updatestatus(wpath, st)
426 except OSError:
427 pass
428
429 def deleted(self, wpath):
430 if wpath == '.hgignore':
431 self.update_hgignore()
432 elif wpath.startswith('.hg/'):
433 if wpath == '.hg/wlock':
434 self.check_dirstate()
435 return
436
437 self.updatestatus(wpath, None)
438
439 def schedule_work(self, wpath, evt):
440 self.eventq.setdefault(wpath, [])
441 prev = self.eventq[wpath]
442 try:
443 if prev and evt == 'm' and prev[-1] in 'cm':
444 return
445 self.eventq[wpath].append(evt)
446 finally:
447 self.deferred += 1
448 self.timeout = 250
449
450 def deferred_event(self, wpath, evt):
451 if evt == 'c':
452 self.created(wpath)
453 elif evt == 'm':
454 self.modified(wpath)
455 elif evt == 'd':
456 self.deleted(wpath)
457
458 def process_create(self, wpath, evt):
459 if self.ui.debugflag:
460 self.ui.note(_('%s event: created %s\n') %
461 (self.event_time(), wpath))
462
463 if evt.mask & inotify.IN_ISDIR:
464 self.scan(wpath)
465 else:
466 self.schedule_work(wpath, 'c')
467
468 def process_delete(self, wpath, evt):
469 if self.ui.debugflag:
470 self.ui.note(('%s event: deleted %s\n') %
471 (self.event_time(), wpath))
472
473 if evt.mask & inotify.IN_ISDIR:
474 self.scan(wpath)
475 else:
476 self.schedule_work(wpath, 'd')
477
478 def process_modify(self, wpath, evt):
479 if self.ui.debugflag:
480 self.ui.note(_('%s event: modified %s\n') %
481 (self.event_time(), wpath))
482
483 if not (evt.mask & inotify.IN_ISDIR):
484 self.schedule_work(wpath, 'm')
485
486 def process_unmount(self, evt):
487 self.ui.warn(_('filesystem containing %s was unmounted\n') %
488 evt.fullpath)
489 sys.exit(0)
490
491 def handle_event(self, fd, event):
492 if self.ui.debugflag:
493 self.ui.note('%s readable: %d bytes\n' %
494 (self.event_time(), self.threshold.readable()))
495 if not self.threshold():
496 if self.registered:
497 if self.ui.debugflag:
498 self.ui.note('%s below threshold - unhooking\n' %
499 (self.event_time()))
500 self.master.poll.unregister(fd)
501 self.registered = False
502 self.timeout = 250
503 else:
504 self.read_events()
505
506 def read_events(self, bufsize=None):
507 events = self.watcher.read(bufsize)
508 if self.ui.debugflag:
509 self.ui.note('%s reading %d events\n' %
510 (self.event_time(), len(events)))
511 for evt in events:
512 wpath = self.wpath(evt)
513 if evt.mask & inotify.IN_UNMOUNT:
514 self.process_unmount(wpath, evt)
515 elif evt.mask & (inotify.IN_MODIFY | inotify.IN_ATTRIB):
516 self.process_modify(wpath, evt)
517 elif evt.mask & (inotify.IN_DELETE | inotify.IN_DELETE_SELF |
518 inotify.IN_MOVED_FROM):
519 self.process_delete(wpath, evt)
520 elif evt.mask & (inotify.IN_CREATE | inotify.IN_MOVED_TO):
521 self.process_create(wpath, evt)
522
523 def handle_timeout(self):
524 if not self.registered:
525 if self.ui.debugflag:
526 self.ui.note('%s hooking back up with %d bytes readable\n' %
527 (self.event_time(), self.threshold.readable()))
528 self.read_events(0)
529 self.master.poll.register(self, select.POLLIN)
530 self.registered = True
531
532 if self.eventq:
533 if self.ui.debugflag:
534 self.ui.note('%s processing %d deferred events as %d\n' %
535 (self.event_time(), self.deferred,
536 len(self.eventq)))
537 eventq = self.eventq.items()
538 eventq.sort()
539 for wpath, evts in eventq:
540 for evt in evts:
541 self.deferred_event(wpath, evt)
542 self.eventq.clear()
543 self.deferred = 0
544 self.timeout = None
545
546 def shutdown(self):
547 self.watcher.close()
548
549 class Server(object):
550 poll_events = select.POLLIN
551
552 def __init__(self, ui, repo, watcher, timeout):
553 self.ui = ui
554 self.repo = repo
555 self.watcher = watcher
556 self.timeout = timeout
557 self.sock = socket.socket(socket.AF_UNIX)
558 self.sockpath = self.repo.join('inotify.sock')
559 try:
560 self.sock.bind(self.sockpath)
561 except socket.error, err:
562 if err[0] == errno.EADDRINUSE:
563 raise AlreadyStartedException(_('could not start server: %s') \
564 % err[1])
565 raise
566 self.sock.listen(5)
567 self.fileno = self.sock.fileno
568
569 def handle_timeout(self):
570 pass
571
572 def handle_event(self, fd, event):
573 sock, addr = self.sock.accept()
574
575 cs = common.recvcs(sock)
576 version = ord(cs.read(1))
577
578 sock.sendall(chr(common.version))
579
580 if version != common.version:
581 self.ui.warn(_('received query from incompatible client '
582 'version %d\n') % version)
583 return
584
585 names = cs.read().split('\0')
586
587 states = names.pop()
588
589 self.ui.note(_('answering query for %r\n') % states)
590
591 if self.watcher.timeout:
592 # We got a query while a rescan is pending. Make sure we
593 # rescan before responding, or we could give back a wrong
594 # answer.
595 self.watcher.handle_timeout()
596
597 if not names:
598 def genresult(states, tree):
599 for fn, state in self.watcher.walk(states, tree):
600 yield fn
601 else:
602 def genresult(states, tree):
603 for fn in names:
604 l = self.watcher.lookup(fn, tree)
605 try:
606 if l in states:
607 yield fn
608 except TypeError:
609 for f, s in self.watcher.walk(states, l, fn):
610 yield f
611
612 results = ['\0'.join(r) for r in [
613 genresult('l', self.watcher.statustrees['l']),
614 genresult('m', self.watcher.statustrees['m']),
615 genresult('a', self.watcher.statustrees['a']),
616 genresult('r', self.watcher.statustrees['r']),
617 genresult('!', self.watcher.statustrees['!']),
618 '?' in states and genresult('?', self.watcher.statustrees['?']) or [],
619 [],
620 'c' in states and genresult('n', self.watcher.tree) or [],
621 ]]
622
623 try:
624 try:
625 sock.sendall(struct.pack(common.resphdrfmt,
626 *map(len, results)))
627 sock.sendall(''.join(results))
628 finally:
629 sock.shutdown(socket.SHUT_WR)
630 except socket.error, err:
631 if err[0] != errno.EPIPE:
632 raise
633
634 def shutdown(self):
635 self.sock.close()
636 try:
637 os.unlink(self.sockpath)
638 except OSError, err:
639 if err.errno != errno.ENOENT:
640 raise
641
642 class Master(object):
643 def __init__(self, ui, repo, timeout=None):
644 self.ui = ui
645 self.repo = repo
646 self.poll = select.poll()
647 self.watcher = Watcher(ui, repo, self)
648 self.server = Server(ui, repo, self.watcher, timeout)
649 self.table = {}
650 for obj in (self.watcher, self.server):
651 fd = obj.fileno()
652 self.table[fd] = obj
653 self.poll.register(fd, obj.poll_events)
654
655 def register(self, fd, mask):
656 self.poll.register(fd, mask)
657
658 def shutdown(self):
659 for obj in self.table.itervalues():
660 obj.shutdown()
661
662 def run(self):
663 self.watcher.setup()
664 self.ui.note(_('finished setup\n'))
665 if os.getenv('TIME_STARTUP'):
666 sys.exit(0)
667 while True:
668 timeout = None
669 timeobj = None
670 for obj in self.table.itervalues():
671 if obj.timeout is not None and (timeout is None or obj.timeout < timeout):
672 timeout, timeobj = obj.timeout, obj
673 try:
674 if self.ui.debugflag:
675 if timeout is None:
676 self.ui.note('polling: no timeout\n')
677 else:
678 self.ui.note('polling: %sms timeout\n' % timeout)
679 events = self.poll.poll(timeout)
680 except select.error, err:
681 if err[0] == errno.EINTR:
682 continue
683 raise
684 if events:
685 for fd, event in events:
686 self.table[fd].handle_event(fd, event)
687 elif timeobj:
688 timeobj.handle_timeout()
689
690 def start(ui, repo):
691 m = Master(ui, repo)
692 sys.stdout.flush()
693 sys.stderr.flush()
694
695 pid = os.fork()
696 if pid:
697 return pid
698
699 os.setsid()
700
701 fd = os.open('/dev/null', os.O_RDONLY)
702 os.dup2(fd, 0)
703 if fd > 0:
704 os.close(fd)
705
706 fd = os.open(ui.config('inotify', 'log', '/dev/null'),
707 os.O_RDWR | os.O_CREAT | os.O_TRUNC)
708 os.dup2(fd, 1)
709 os.dup2(fd, 2)
710 if fd > 2:
711 os.close(fd)
712
713 try:
714 m.run()
715 finally:
716 m.shutdown()
717 os._exit(0)