hgext/inotify/linuxserver.py
changeset 20622 352abbb0be88
parent 20621 5beb49fd5958
child 20623 84f6bc03b4e5
equal deleted inserted replaced
20621:5beb49fd5958 20622:352abbb0be88
     1 # linuxserver.py - inotify status server for linux
       
     2 #
       
     3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
       
     4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
       
     5 #
       
     6 # This software may be used and distributed according to the terms of the
       
     7 # GNU General Public License version 2 or any later version.
       
     8 
       
     9 from mercurial.i18n import _
       
    10 from mercurial import osutil, util, error
       
    11 import server
       
    12 import errno, os, select, stat, sys, time
       
    13 
       
    14 try:
       
    15     import linux as inotify
       
    16     from linux import watcher
       
    17 except ImportError:
       
    18     raise
       
    19 
       
    20 def walkrepodirs(dirstate, absroot):
       
    21     '''Iterate over all subdirectories of this repo.
       
    22     Exclude the .hg directory, any nested repos, and ignored dirs.'''
       
    23     def walkit(dirname, top):
       
    24         fullpath = server.join(absroot, dirname)
       
    25         try:
       
    26             for name, kind in osutil.listdir(fullpath):
       
    27                 if kind == stat.S_IFDIR:
       
    28                     if name == '.hg':
       
    29                         if not top:
       
    30                             return
       
    31                     else:
       
    32                         d = server.join(dirname, name)
       
    33                         if dirstate._ignore(d):
       
    34                             continue
       
    35                         for subdir in walkit(d, False):
       
    36                             yield subdir
       
    37         except OSError, err:
       
    38             if err.errno not in server.walk_ignored_errors:
       
    39                 raise
       
    40         yield fullpath
       
    41 
       
    42     return walkit('', True)
       
    43 
       
    44 def _explain_watch_limit(ui, dirstate, rootabs):
       
    45     path = '/proc/sys/fs/inotify/max_user_watches'
       
    46     try:
       
    47         limit = int(util.readfile(path))
       
    48     except IOError, err:
       
    49         if err.errno != errno.ENOENT:
       
    50             raise
       
    51         raise util.Abort(_('this system does not seem to '
       
    52                            'support inotify'))
       
    53     ui.warn(_('*** the current per-user limit on the number '
       
    54               'of inotify watches is %s\n') % limit)
       
    55     ui.warn(_('*** this limit is too low to watch every '
       
    56               'directory in this repository\n'))
       
    57     ui.warn(_('*** counting directories: '))
       
    58     ndirs = len(list(walkrepodirs(dirstate, rootabs)))
       
    59     ui.warn(_('found %d\n') % ndirs)
       
    60     newlimit = min(limit, 1024)
       
    61     while newlimit < ((limit + ndirs) * 1.1):
       
    62         newlimit *= 2
       
    63     ui.warn(_('*** to raise the limit from %d to %d (run as root):\n') %
       
    64             (limit, newlimit))
       
    65     ui.warn(_('***  echo %d > %s\n') % (newlimit, path))
       
    66     raise util.Abort(_('cannot watch %s until inotify watch limit is raised')
       
    67                      % rootabs)
       
    68 
       
    69 class pollable(object):
       
    70     """
       
    71     Interface to support polling.
       
    72     The file descriptor returned by fileno() is registered to a polling
       
    73     object.
       
    74     Usage:
       
    75         Every tick, check if an event has happened since the last tick:
       
    76         * If yes, call handle_events
       
    77         * If no, call handle_timeout
       
    78     """
       
    79     poll_events = select.POLLIN
       
    80     instances = {}
       
    81     poll = select.poll()
       
    82 
       
    83     def fileno(self):
       
    84         raise NotImplementedError
       
    85 
       
    86     def handle_events(self, events):
       
    87         raise NotImplementedError
       
    88 
       
    89     def handle_timeout(self):
       
    90         raise NotImplementedError
       
    91 
       
    92     def shutdown(self):
       
    93         raise NotImplementedError
       
    94 
       
    95     def register(self, timeout):
       
    96         fd = self.fileno()
       
    97 
       
    98         pollable.poll.register(fd, pollable.poll_events)
       
    99         pollable.instances[fd] = self
       
   100 
       
   101         self.registered = True
       
   102         self.timeout = timeout
       
   103 
       
   104     def unregister(self):
       
   105         pollable.poll.unregister(self)
       
   106         self.registered = False
       
   107 
       
   108     @classmethod
       
   109     def run(cls):
       
   110         while True:
       
   111             timeout = None
       
   112             timeobj = None
       
   113             for obj in cls.instances.itervalues():
       
   114                 if obj.timeout is not None and (timeout is None
       
   115                                                 or obj.timeout < timeout):
       
   116                     timeout, timeobj = obj.timeout, obj
       
   117             try:
       
   118                 events = cls.poll.poll(timeout)
       
   119             except select.error, err:
       
   120                 if err.args[0] == errno.EINTR:
       
   121                     continue
       
   122                 raise
       
   123             if events:
       
   124                 by_fd = {}
       
   125                 for fd, event in events:
       
   126                     by_fd.setdefault(fd, []).append(event)
       
   127 
       
   128                 for fd, events in by_fd.iteritems():
       
   129                     cls.instances[fd].handle_pollevents(events)
       
   130 
       
   131             elif timeobj:
       
   132                 timeobj.handle_timeout()
       
   133 
       
   134 def eventaction(code):
       
   135     """
       
   136     Decorator to help handle events in repowatcher
       
   137     """
       
   138     def decorator(f):
       
   139         def wrapper(self, wpath):
       
   140             if code == 'm' and wpath in self.lastevent and \
       
   141                 self.lastevent[wpath] in 'cm':
       
   142                 return
       
   143             self.lastevent[wpath] = code
       
   144             self.timeout = 250
       
   145 
       
   146             f(self, wpath)
       
   147 
       
   148         wrapper.func_name = f.func_name
       
   149         return wrapper
       
   150     return decorator
       
   151 
       
   152 class repowatcher(server.repowatcher, pollable):
       
   153     """
       
   154     Watches inotify events
       
   155     """
       
   156     mask = (
       
   157         inotify.IN_ATTRIB |
       
   158         inotify.IN_CREATE |
       
   159         inotify.IN_DELETE |
       
   160         inotify.IN_DELETE_SELF |
       
   161         inotify.IN_MODIFY |
       
   162         inotify.IN_MOVED_FROM |
       
   163         inotify.IN_MOVED_TO |
       
   164         inotify.IN_MOVE_SELF |
       
   165         inotify.IN_ONLYDIR |
       
   166         inotify.IN_UNMOUNT |
       
   167         0)
       
   168 
       
   169     def __init__(self, ui, dirstate, root):
       
   170         server.repowatcher.__init__(self, ui, dirstate, root)
       
   171 
       
   172         self.lastevent = {}
       
   173         self.dirty = False
       
   174         try:
       
   175             self.watcher = watcher.watcher()
       
   176         except OSError, err:
       
   177             raise util.Abort(_('inotify service not available: %s') %
       
   178                              err.strerror)
       
   179         self.threshold = watcher.threshold(self.watcher)
       
   180         self.fileno = self.watcher.fileno
       
   181         self.register(timeout=None)
       
   182 
       
   183         self.handle_timeout()
       
   184         self.scan()
       
   185 
       
   186     def event_time(self):
       
   187         last = self.last_event
       
   188         now = time.time()
       
   189         self.last_event = now
       
   190 
       
   191         if last is None:
       
   192             return 'start'
       
   193         delta = now - last
       
   194         if delta < 5:
       
   195             return '+%.3f' % delta
       
   196         if delta < 50:
       
   197             return '+%.2f' % delta
       
   198         return '+%.1f' % delta
       
   199 
       
   200     def add_watch(self, path, mask):
       
   201         if not path:
       
   202             return
       
   203         if self.watcher.path(path) is None:
       
   204             if self.ui.debugflag:
       
   205                 self.ui.note(_('watching %r\n') % path[self.prefixlen:])
       
   206             try:
       
   207                 self.watcher.add(path, mask)
       
   208             except OSError, err:
       
   209                 if err.errno in (errno.ENOENT, errno.ENOTDIR):
       
   210                     return
       
   211                 if err.errno != errno.ENOSPC:
       
   212                     raise
       
   213                 _explain_watch_limit(self.ui, self.dirstate, self.wprefix)
       
   214 
       
   215     def setup(self):
       
   216         self.ui.note(_('watching directories under %r\n') % self.wprefix)
       
   217         self.add_watch(self.wprefix + '.hg', inotify.IN_DELETE)
       
   218 
       
   219     def scan(self, topdir=''):
       
   220         ds = self.dirstate._map.copy()
       
   221         self.add_watch(server.join(self.wprefix, topdir), self.mask)
       
   222         for root, dirs, files in server.walk(self.dirstate, self.wprefix,
       
   223                                              topdir):
       
   224             for d in dirs:
       
   225                 self.add_watch(server.join(root, d), self.mask)
       
   226             wroot = root[self.prefixlen:]
       
   227             for fn in files:
       
   228                 wfn = server.join(wroot, fn)
       
   229                 self.updatefile(wfn, self.getstat(wfn))
       
   230                 ds.pop(wfn, None)
       
   231         wtopdir = topdir
       
   232         if wtopdir and wtopdir[-1] != '/':
       
   233             wtopdir += '/'
       
   234         for wfn, state in ds.iteritems():
       
   235             if not wfn.startswith(wtopdir):
       
   236                 continue
       
   237             try:
       
   238                 st = self.stat(wfn)
       
   239             except OSError:
       
   240                 status = state[0]
       
   241                 self.deletefile(wfn, status)
       
   242             else:
       
   243                 self.updatefile(wfn, st)
       
   244         self.check_deleted('!')
       
   245         self.check_deleted('r')
       
   246 
       
   247     @eventaction('c')
       
   248     def created(self, wpath):
       
   249         if wpath == '.hgignore':
       
   250             self.update_hgignore()
       
   251         try:
       
   252             st = self.stat(wpath)
       
   253             if stat.S_ISREG(st[0]) or stat.S_ISLNK(st[0]):
       
   254                 self.updatefile(wpath, st)
       
   255         except OSError:
       
   256             pass
       
   257 
       
   258     @eventaction('m')
       
   259     def modified(self, wpath):
       
   260         if wpath == '.hgignore':
       
   261             self.update_hgignore()
       
   262         try:
       
   263             st = self.stat(wpath)
       
   264             if stat.S_ISREG(st[0]):
       
   265                 if self.dirstate[wpath] in 'lmn':
       
   266                     self.updatefile(wpath, st)
       
   267         except OSError:
       
   268             pass
       
   269 
       
   270     @eventaction('d')
       
   271     def deleted(self, wpath):
       
   272         if wpath == '.hgignore':
       
   273             self.update_hgignore()
       
   274         elif wpath.startswith('.hg/'):
       
   275             return
       
   276 
       
   277         self.deletefile(wpath, self.dirstate[wpath])
       
   278 
       
   279     def process_create(self, wpath, evt):
       
   280         if self.ui.debugflag:
       
   281             self.ui.note(_('%s event: created %s\n') %
       
   282                          (self.event_time(), wpath))
       
   283 
       
   284         if evt.mask & inotify.IN_ISDIR:
       
   285             self.scan(wpath)
       
   286         else:
       
   287             self.created(wpath)
       
   288 
       
   289     def process_delete(self, wpath, evt):
       
   290         if self.ui.debugflag:
       
   291             self.ui.note(_('%s event: deleted %s\n') %
       
   292                          (self.event_time(), wpath))
       
   293 
       
   294         if evt.mask & inotify.IN_ISDIR:
       
   295             tree = self.tree.dir(wpath)
       
   296             todelete = [wfn for wfn, ignore in tree.walk('?')]
       
   297             for fn in todelete:
       
   298                 self.deletefile(fn, '?')
       
   299             self.scan(wpath)
       
   300         else:
       
   301             self.deleted(wpath)
       
   302 
       
   303     def process_modify(self, wpath, evt):
       
   304         if self.ui.debugflag:
       
   305             self.ui.note(_('%s event: modified %s\n') %
       
   306                          (self.event_time(), wpath))
       
   307 
       
   308         if not (evt.mask & inotify.IN_ISDIR):
       
   309             self.modified(wpath)
       
   310 
       
   311     def process_unmount(self, evt):
       
   312         self.ui.warn(_('filesystem containing %s was unmounted\n') %
       
   313                      evt.fullpath)
       
   314         sys.exit(0)
       
   315 
       
   316     def handle_pollevents(self, events):
       
   317         if self.ui.debugflag:
       
   318             self.ui.note(_('%s readable: %d bytes\n') %
       
   319                          (self.event_time(), self.threshold.readable()))
       
   320         if not self.threshold():
       
   321             if self.registered:
       
   322                 if self.ui.debugflag:
       
   323                     self.ui.note(_('%s below threshold - unhooking\n') %
       
   324                                  (self.event_time()))
       
   325                 self.unregister()
       
   326                 self.timeout = 250
       
   327         else:
       
   328             self.read_events()
       
   329 
       
   330     def read_events(self, bufsize=None):
       
   331         events = self.watcher.read(bufsize)
       
   332         if self.ui.debugflag:
       
   333             self.ui.note(_('%s reading %d events\n') %
       
   334                          (self.event_time(), len(events)))
       
   335         for evt in events:
       
   336             if evt.fullpath == self.wprefix[:-1]:
       
   337                 # events on the root of the repository
       
   338                 # itself, e.g. permission changes or repository move
       
   339                 continue
       
   340             assert evt.fullpath.startswith(self.wprefix)
       
   341             wpath = evt.fullpath[self.prefixlen:]
       
   342 
       
   343             # paths have been normalized, wpath never ends with a '/'
       
   344 
       
   345             if wpath.startswith('.hg/') and evt.mask & inotify.IN_ISDIR:
       
   346                 # ignore subdirectories of .hg/ (merge, patches...)
       
   347                 continue
       
   348             if wpath == ".hg/wlock":
       
   349                 if evt.mask & inotify.IN_DELETE:
       
   350                     self.dirstate.invalidate()
       
   351                     self.dirty = False
       
   352                     self.scan()
       
   353                 elif evt.mask & inotify.IN_CREATE:
       
   354                     self.dirty = True
       
   355             else:
       
   356                 if self.dirty:
       
   357                     continue
       
   358 
       
   359                 if evt.mask & inotify.IN_UNMOUNT:
       
   360                     self.process_unmount(wpath, evt)
       
   361                 elif evt.mask & (inotify.IN_MODIFY | inotify.IN_ATTRIB):
       
   362                     self.process_modify(wpath, evt)
       
   363                 elif evt.mask & (inotify.IN_DELETE | inotify.IN_DELETE_SELF |
       
   364                                  inotify.IN_MOVED_FROM):
       
   365                     self.process_delete(wpath, evt)
       
   366                 elif evt.mask & (inotify.IN_CREATE | inotify.IN_MOVED_TO):
       
   367                     self.process_create(wpath, evt)
       
   368 
       
   369         self.lastevent.clear()
       
   370 
       
   371     def handle_timeout(self):
       
   372         if not self.registered:
       
   373             if self.ui.debugflag:
       
   374                 self.ui.note(_('%s hooking back up with %d bytes readable\n') %
       
   375                              (self.event_time(), self.threshold.readable()))
       
   376             self.read_events(0)
       
   377             self.register(timeout=None)
       
   378 
       
   379         self.timeout = None
       
   380 
       
   381     def shutdown(self):
       
   382         self.watcher.close()
       
   383 
       
   384     def debug(self):
       
   385         """
       
   386         Returns a sorted list of relatives paths currently watched,
       
   387         for debugging purposes.
       
   388         """
       
   389         return sorted(tuple[0][self.prefixlen:] for tuple in self.watcher)
       
   390 
       
   391 class socketlistener(server.socketlistener, pollable):
       
   392     """
       
   393     Listens for client queries on unix socket inotify.sock
       
   394     """
       
   395     def __init__(self, ui, root, repowatcher, timeout):
       
   396         server.socketlistener.__init__(self, ui, root, repowatcher, timeout)
       
   397         self.register(timeout=timeout)
       
   398 
       
   399     def handle_timeout(self):
       
   400         raise server.TimeoutException
       
   401 
       
   402     def handle_pollevents(self, events):
       
   403         for e in events:
       
   404             self.accept_connection()
       
   405 
       
   406     def shutdown(self):
       
   407         self.sock.close()
       
   408         self.sock.cleanup()
       
   409 
       
   410     def answer_stat_query(self, cs):
       
   411         if self.repowatcher.timeout:
       
   412             # We got a query while a rescan is pending.  Make sure we
       
   413             # rescan before responding, or we could give back a wrong
       
   414             # answer.
       
   415             self.repowatcher.handle_timeout()
       
   416         return server.socketlistener.answer_stat_query(self, cs)
       
   417 
       
   418 class master(object):
       
   419     def __init__(self, ui, dirstate, root, timeout=None):
       
   420         self.ui = ui
       
   421         self.repowatcher = repowatcher(ui, dirstate, root)
       
   422         self.socketlistener = socketlistener(ui, root, self.repowatcher,
       
   423                                              timeout)
       
   424 
       
   425     def shutdown(self):
       
   426         for obj in pollable.instances.itervalues():
       
   427             try:
       
   428                 obj.shutdown()
       
   429             except error.SignalInterrupt:
       
   430                 pass
       
   431 
       
   432     def run(self):
       
   433         self.repowatcher.setup()
       
   434         self.ui.note(_('finished setup\n'))
       
   435         if os.getenv('TIME_STARTUP'):
       
   436             sys.exit(0)
       
   437         pollable.run()