comparison hgext/inotify/linuxserver.py @ 9933:2e7902158af9

inotify: create a common, OS-independent server entry point * rename server.py to linuxserver.py * create server.py: it will contain OS-independent logic for servers, and will import the right server depending on the OS * old server.server class is renamed to linuxserver.socketlistener
author Nicolas Dumazet <nicdumz.commits@gmail.com>
date Fri, 14 Aug 2009 08:19:49 -0400
parents hgext/inotify/server.py@8939900073a8
children 8fab31727037
comparison
equal deleted inserted replaced
9932:2fcbef9a349a 9933:2e7902158af9
1 # linuxserver.py - inotify status server for linux
2 #
3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
5 #
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2, incorporated herein by reference.
8
9 from mercurial.i18n import _
10 from mercurial import osutil, util
11 import common
12 import server
13 import errno, os, select, stat, sys, time
14
15 try:
16 import linux as inotify
17 from linux import watcher
18 except ImportError:
19 raise
20
21 def walkrepodirs(dirstate, absroot):
22 '''Iterate over all subdirectories of this repo.
23 Exclude the .hg directory, any nested repos, and ignored dirs.'''
24 def walkit(dirname, top):
25 fullpath = server.join(absroot, dirname)
26 try:
27 for name, kind in osutil.listdir(fullpath):
28 if kind == stat.S_IFDIR:
29 if name == '.hg':
30 if not top:
31 return
32 else:
33 d = server.join(dirname, name)
34 if dirstate._ignore(d):
35 continue
36 for subdir in walkit(d, False):
37 yield subdir
38 except OSError, err:
39 if err.errno not in server.walk_ignored_errors:
40 raise
41 yield fullpath
42
43 return walkit('', True)
44
45 def _explain_watch_limit(ui, dirstate, rootabs):
46 path = '/proc/sys/fs/inotify/max_user_watches'
47 try:
48 limit = int(file(path).read())
49 except IOError, err:
50 if err.errno != errno.ENOENT:
51 raise
52 raise util.Abort(_('this system does not seem to '
53 'support inotify'))
54 ui.warn(_('*** the current per-user limit on the number '
55 'of inotify watches is %s\n') % limit)
56 ui.warn(_('*** this limit is too low to watch every '
57 'directory in this repository\n'))
58 ui.warn(_('*** counting directories: '))
59 ndirs = len(list(walkrepodirs(dirstate, rootabs)))
60 ui.warn(_('found %d\n') % ndirs)
61 newlimit = min(limit, 1024)
62 while newlimit < ((limit + ndirs) * 1.1):
63 newlimit *= 2
64 ui.warn(_('*** to raise the limit from %d to %d (run as root):\n') %
65 (limit, newlimit))
66 ui.warn(_('*** echo %d > %s\n') % (newlimit, path))
67 raise util.Abort(_('cannot watch %s until inotify watch limit is raised')
68 % rootabs)
69
70 class pollable(object):
71 """
72 Interface to support polling.
73 The file descriptor returned by fileno() is registered to a polling
74 object.
75 Usage:
76 Every tick, check if an event has happened since the last tick:
77 * If yes, call handle_events
78 * If no, call handle_timeout
79 """
80 poll_events = select.POLLIN
81 instances = {}
82 poll = select.poll()
83
84 def fileno(self):
85 raise NotImplementedError
86
87 def handle_events(self, events):
88 raise NotImplementedError
89
90 def handle_timeout(self):
91 raise NotImplementedError
92
93 def shutdown(self):
94 raise NotImplementedError
95
96 def register(self, timeout):
97 fd = self.fileno()
98
99 pollable.poll.register(fd, pollable.poll_events)
100 pollable.instances[fd] = self
101
102 self.registered = True
103 self.timeout = timeout
104
105 def unregister(self):
106 pollable.poll.unregister(self)
107 self.registered = False
108
109 @classmethod
110 def run(cls):
111 while True:
112 timeout = None
113 timeobj = None
114 for obj in cls.instances.itervalues():
115 if obj.timeout is not None and (timeout is None or obj.timeout < timeout):
116 timeout, timeobj = obj.timeout, obj
117 try:
118 events = cls.poll.poll(timeout)
119 except select.error, err:
120 if err[0] == errno.EINTR:
121 continue
122 raise
123 if events:
124 by_fd = {}
125 for fd, event in events:
126 by_fd.setdefault(fd, []).append(event)
127
128 for fd, events in by_fd.iteritems():
129 cls.instances[fd].handle_pollevents(events)
130
131 elif timeobj:
132 timeobj.handle_timeout()
133
134 def eventaction(code):
135 """
136 Decorator to help handle events in repowatcher
137 """
138 def decorator(f):
139 def wrapper(self, wpath):
140 if code == 'm' and wpath in self.lastevent and \
141 self.lastevent[wpath] in 'cm':
142 return
143 self.lastevent[wpath] = code
144 self.timeout = 250
145
146 f(self, wpath)
147
148 wrapper.func_name = f.func_name
149 return wrapper
150 return decorator
151
152 class repowatcher(server.repowatcher, pollable):
153 """
154 Watches inotify events
155 """
156 mask = (
157 inotify.IN_ATTRIB |
158 inotify.IN_CREATE |
159 inotify.IN_DELETE |
160 inotify.IN_DELETE_SELF |
161 inotify.IN_MODIFY |
162 inotify.IN_MOVED_FROM |
163 inotify.IN_MOVED_TO |
164 inotify.IN_MOVE_SELF |
165 inotify.IN_ONLYDIR |
166 inotify.IN_UNMOUNT |
167 0)
168
169 def __init__(self, ui, dirstate, root):
170 server.repowatcher.__init__(self, ui, dirstate, root)
171
172 self.lastevent = {}
173 try:
174 self.watcher = watcher.watcher()
175 except OSError, err:
176 raise util.Abort(_('inotify service not available: %s') %
177 err.strerror)
178 self.threshold = watcher.threshold(self.watcher)
179 self.fileno = self.watcher.fileno
180 self.register(timeout=None)
181
182 self.handle_timeout()
183 self.scan()
184
185 def event_time(self):
186 last = self.last_event
187 now = time.time()
188 self.last_event = now
189
190 if last is None:
191 return 'start'
192 delta = now - last
193 if delta < 5:
194 return '+%.3f' % delta
195 if delta < 50:
196 return '+%.2f' % delta
197 return '+%.1f' % delta
198
199 def add_watch(self, path, mask):
200 if not path:
201 return
202 if self.watcher.path(path) is None:
203 if self.ui.debugflag:
204 self.ui.note(_('watching %r\n') % path[self.prefixlen:])
205 try:
206 self.watcher.add(path, mask)
207 except OSError, err:
208 if err.errno in (errno.ENOENT, errno.ENOTDIR):
209 return
210 if err.errno != errno.ENOSPC:
211 raise
212 _explain_watch_limit(self.ui, self.dirstate, self.wprefix)
213
214 def setup(self):
215 self.ui.note(_('watching directories under %r\n') % self.wprefix)
216 self.add_watch(self.wprefix + '.hg', inotify.IN_DELETE)
217 self.check_dirstate()
218
219 def scan(self, topdir=''):
220 ds = self.dirstate._map.copy()
221 self.add_watch(server.join(self.wprefix, topdir), self.mask)
222 for root, dirs, files in server.walk(self.dirstate, self.wprefix,
223 topdir):
224 for d in dirs:
225 self.add_watch(server.join(root, d), self.mask)
226 wroot = root[self.prefixlen:]
227 for fn in files:
228 wfn = server.join(wroot, fn)
229 self.updatefile(wfn, self.getstat(wfn))
230 ds.pop(wfn, None)
231 wtopdir = topdir
232 if wtopdir and wtopdir[-1] != '/':
233 wtopdir += '/'
234 for wfn, state in ds.iteritems():
235 if not wfn.startswith(wtopdir):
236 continue
237 try:
238 st = self.stat(wfn)
239 except OSError:
240 status = state[0]
241 self.deletefile(wfn, status)
242 else:
243 self.updatefile(wfn, st)
244 self.check_deleted('!')
245 self.check_deleted('r')
246
247 @eventaction('c')
248 def created(self, wpath):
249 if wpath == '.hgignore':
250 self.update_hgignore()
251 try:
252 st = self.stat(wpath)
253 if stat.S_ISREG(st[0]):
254 self.updatefile(wpath, st)
255 except OSError:
256 pass
257
258 @eventaction('m')
259 def modified(self, wpath):
260 if wpath == '.hgignore':
261 self.update_hgignore()
262 try:
263 st = self.stat(wpath)
264 if stat.S_ISREG(st[0]):
265 if self.dirstate[wpath] in 'lmn':
266 self.updatefile(wpath, st)
267 except OSError:
268 pass
269
270 @eventaction('d')
271 def deleted(self, wpath):
272 if wpath == '.hgignore':
273 self.update_hgignore()
274 elif wpath.startswith('.hg/'):
275 if wpath == '.hg/wlock':
276 self.check_dirstate()
277 return
278
279 self.deletefile(wpath, self.dirstate[wpath])
280
281 def process_create(self, wpath, evt):
282 if self.ui.debugflag:
283 self.ui.note(_('%s event: created %s\n') %
284 (self.event_time(), wpath))
285
286 if evt.mask & inotify.IN_ISDIR:
287 self.scan(wpath)
288 else:
289 self.created(wpath)
290
291 def process_delete(self, wpath, evt):
292 if self.ui.debugflag:
293 self.ui.note(_('%s event: deleted %s\n') %
294 (self.event_time(), wpath))
295
296 if evt.mask & inotify.IN_ISDIR:
297 tree = self.tree.dir(wpath)
298 todelete = [wfn for wfn, ignore in tree.walk('?')]
299 for fn in todelete:
300 self.deletefile(fn, '?')
301 self.scan(wpath)
302 else:
303 self.deleted(wpath)
304
305 def process_modify(self, wpath, evt):
306 if self.ui.debugflag:
307 self.ui.note(_('%s event: modified %s\n') %
308 (self.event_time(), wpath))
309
310 if not (evt.mask & inotify.IN_ISDIR):
311 self.modified(wpath)
312
313 def process_unmount(self, evt):
314 self.ui.warn(_('filesystem containing %s was unmounted\n') %
315 evt.fullpath)
316 sys.exit(0)
317
318 def handle_pollevents(self, events):
319 if self.ui.debugflag:
320 self.ui.note(_('%s readable: %d bytes\n') %
321 (self.event_time(), self.threshold.readable()))
322 if not self.threshold():
323 if self.registered:
324 if self.ui.debugflag:
325 self.ui.note(_('%s below threshold - unhooking\n') %
326 (self.event_time()))
327 self.unregister()
328 self.timeout = 250
329 else:
330 self.read_events()
331
332 def read_events(self, bufsize=None):
333 events = self.watcher.read(bufsize)
334 if self.ui.debugflag:
335 self.ui.note(_('%s reading %d events\n') %
336 (self.event_time(), len(events)))
337 for evt in events:
338 assert evt.fullpath.startswith(self.wprefix)
339 wpath = evt.fullpath[self.prefixlen:]
340
341 # paths have been normalized, wpath never ends with a '/'
342
343 if wpath.startswith('.hg/') and evt.mask & inotify.IN_ISDIR:
344 # ignore subdirectories of .hg/ (merge, patches...)
345 continue
346
347 if evt.mask & inotify.IN_UNMOUNT:
348 self.process_unmount(wpath, evt)
349 elif evt.mask & (inotify.IN_MODIFY | inotify.IN_ATTRIB):
350 self.process_modify(wpath, evt)
351 elif evt.mask & (inotify.IN_DELETE | inotify.IN_DELETE_SELF |
352 inotify.IN_MOVED_FROM):
353 self.process_delete(wpath, evt)
354 elif evt.mask & (inotify.IN_CREATE | inotify.IN_MOVED_TO):
355 self.process_create(wpath, evt)
356
357 self.lastevent.clear()
358
359 def handle_timeout(self):
360 if not self.registered:
361 if self.ui.debugflag:
362 self.ui.note(_('%s hooking back up with %d bytes readable\n') %
363 (self.event_time(), self.threshold.readable()))
364 self.read_events(0)
365 self.register(timeout=None)
366
367 self.timeout = None
368
369 def shutdown(self):
370 self.watcher.close()
371
372 def debug(self):
373 """
374 Returns a sorted list of relatives paths currently watched,
375 for debugging purposes.
376 """
377 return sorted(tuple[0][self.prefixlen:] for tuple in self.watcher)
378
379 class socketlistener(server.socketlistener, pollable):
380 """
381 Listens for client queries on unix socket inotify.sock
382 """
383 def __init__(self, ui, root, repowatcher, timeout):
384 server.socketlistener.__init__(self, ui, root, repowatcher, timeout)
385 self.register(timeout=timeout)
386
387 def handle_timeout(self):
388 pass
389
390 def handle_pollevents(self, events):
391 for e in events:
392 self.accept_connection()
393
394 def shutdown(self):
395 self.sock.close()
396 try:
397 os.unlink(self.sockpath)
398 if self.realsockpath:
399 os.unlink(self.realsockpath)
400 os.rmdir(os.path.dirname(self.realsockpath))
401 except OSError, err:
402 if err.errno != errno.ENOENT:
403 raise
404
405 def answer_stat_query(self, cs):
406 if self.repowatcher.timeout:
407 # We got a query while a rescan is pending. Make sure we
408 # rescan before responding, or we could give back a wrong
409 # answer.
410 self.repowatcher.handle_timeout()
411 return server.socketlistener.answer_stat_query(self, cs)
412
413 class master(object):
414 def __init__(self, ui, dirstate, root, timeout=None):
415 self.ui = ui
416 self.repowatcher = repowatcher(ui, dirstate, root)
417 self.socketlistener = socketlistener(ui, root, self.repowatcher,
418 timeout)
419
420 def shutdown(self):
421 for obj in pollable.instances.itervalues():
422 obj.shutdown()
423
424 def run(self):
425 self.repowatcher.setup()
426 self.ui.note(_('finished setup\n'))
427 if os.getenv('TIME_STARTUP'):
428 sys.exit(0)
429 pollable.run()