util: create a context manager to handle timing
The context manager is pulled out of the timed decorator function, and
refactored to provide a stats instance, with added tests.
--- a/contrib/import-checker.py Wed Aug 01 23:08:18 2018 -0400
+++ b/contrib/import-checker.py Wed Aug 01 16:05:41 2018 +0200
@@ -36,6 +36,7 @@
'mercurial.pure.parsers',
# third-party imports should be directly imported
'mercurial.thirdparty',
+ 'mercurial.thirdparty.attr',
'mercurial.thirdparty.cbor',
'mercurial.thirdparty.cbor.cbor2',
'mercurial.thirdparty.zope',
--- a/mercurial/util.py Wed Aug 01 23:08:18 2018 -0400
+++ b/mercurial/util.py Wed Aug 01 16:05:41 2018 +0200
@@ -36,6 +36,9 @@
import warnings
import zlib
+from .thirdparty import (
+ attr,
+)
from . import (
encoding,
error,
@@ -2874,7 +2877,41 @@
(1, 0.000000001, _('%.3f ns')),
)
-_timenesting = [0]
+@attr.s
+class timedcmstats(object):
+ """Stats information produced by the timedcm context manager on entering."""
+
+ # the starting value of the timer as a float (meaning and resulution is
+ # platform dependent, see util.timer)
+ start = attr.ib(default=attr.Factory(lambda: timer()))
+ # the number of seconds as a floating point value; starts at 0, updated when
+ # the context is exited.
+ elapsed = attr.ib(default=0)
+ # the number of nested timedcm context managers.
+ level = attr.ib(default=1)
+
+ def __str__(self):
+ return timecount(self.elapsed) if self.elapsed else '<unknown>'
+
+@contextlib.contextmanager
+def timedcm():
+ """A context manager that produces timing information for a given context.
+
+ On entering a timedcmstats instance is produced.
+
+ This context manager is reentrant.
+
+ """
+ # track nested context managers
+ timedcm._nested += 1
+ timing_stats = timedcmstats(level=timedcm._nested)
+ try:
+ yield timing_stats
+ finally:
+ timing_stats.elapsed = timer() - timing_stats.start
+ timedcm._nested -= 1
+
+timedcm._nested = 0
def timed(func):
'''Report the execution time of a function call to stderr.
@@ -2888,18 +2925,12 @@
'''
def wrapper(*args, **kwargs):
- start = timer()
- indent = 2
- _timenesting[0] += indent
- try:
- return func(*args, **kwargs)
- finally:
- elapsed = timer() - start
- _timenesting[0] -= indent
- stderr = procutil.stderr
- stderr.write('%s%s: %s\n' %
- (' ' * _timenesting[0], func.__name__,
- timecount(elapsed)))
+ with timedcm() as time_stats:
+ result = func(*args, **kwargs)
+ stderr = procutil.stderr
+ stderr.write('%s%s: %s\n' % (
+ ' ' * time_stats.level * 2, func.__name__, time_stats))
+ return result
return wrapper
_sizeunits = (('m', 2**20), ('k', 2**10), ('g', 2**30),
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-util.py Wed Aug 01 16:05:41 2018 +0200
@@ -0,0 +1,135 @@
+# unit tests for mercuril.util utilities
+from __future__ import absolute_import
+
+import contextlib
+import itertools
+import unittest
+
+from mercurial import pycompat, util, utils
+
+@contextlib.contextmanager
+def mocktimer(incr=0.1, *additional_targets):
+ """Replaces util.timer and additional_targets with a mock
+
+ The timer starts at 0. On each call the time incremented by the value
+ of incr. If incr is an iterable, then the time is incremented by the
+ next value from that iterable, looping in a cycle when reaching the end.
+
+ additional_targets must be a sequence of (object, attribute_name) tuples;
+ the mock is set with setattr(object, attribute_name, mock).
+
+ """
+ time = [0]
+ try:
+ incr = itertools.cycle(incr)
+ except TypeError:
+ incr = itertools.repeat(incr)
+
+ def timer():
+ time[0] += next(incr)
+ return time[0]
+
+ # record original values
+ orig = util.timer
+ additional_origs = [(o, a, getattr(o, a)) for o, a in additional_targets]
+
+ # mock out targets
+ util.timer = timer
+ for obj, attr in additional_targets:
+ setattr(obj, attr, timer)
+
+ try:
+ yield
+ finally:
+ # restore originals
+ util.timer = orig
+ for args in additional_origs:
+ setattr(*args)
+
+# attr.s default factory for util.timedstats.start binds the timer we
+# need to mock out.
+_start_default = (util.timedcmstats.start.default, 'factory')
+
+@contextlib.contextmanager
+def capturestderr():
+ """Replace utils.procutil.stderr with a pycompat.bytesio instance
+
+ The instance is made available as the return value of __enter__.
+
+ This contextmanager is reentrant.
+
+ """
+ orig = utils.procutil.stderr
+ utils.procutil.stderr = pycompat.bytesio()
+ try:
+ yield utils.procutil.stderr
+ finally:
+ utils.procutil.stderr = orig
+
+class timedtests(unittest.TestCase):
+ def testtimedcmstatsstr(self):
+ stats = util.timedcmstats()
+ self.assertEqual(str(stats), '<unknown>')
+ stats.elapsed = 12.34
+ self.assertEqual(str(stats), util.timecount(12.34))
+
+ def testtimedcmcleanexit(self):
+ # timestamps 1, 4, elapsed time of 4 - 1 = 3
+ with mocktimer([1, 3], _start_default):
+ with util.timedcm() as stats:
+ # actual context doesn't matter
+ pass
+
+ self.assertEqual(stats.start, 1)
+ self.assertEqual(stats.elapsed, 3)
+ self.assertEqual(stats.level, 1)
+
+ def testtimedcmnested(self):
+ # timestamps 1, 3, 6, 10, elapsed times of 6 - 3 = 3 and 10 - 1 = 9
+ with mocktimer([1, 2, 3, 4], _start_default):
+ with util.timedcm() as outer_stats:
+ with util.timedcm() as inner_stats:
+ # actual context doesn't matter
+ pass
+
+ self.assertEqual(outer_stats.start, 1)
+ self.assertEqual(outer_stats.elapsed, 9)
+ self.assertEqual(outer_stats.level, 1)
+
+ self.assertEqual(inner_stats.start, 3)
+ self.assertEqual(inner_stats.elapsed, 3)
+ self.assertEqual(inner_stats.level, 2)
+
+ def testtimedcmexception(self):
+ # timestamps 1, 4, elapsed time of 4 - 1 = 3
+ with mocktimer([1, 3], _start_default):
+ try:
+ with util.timedcm() as stats:
+ raise ValueError()
+ except ValueError:
+ pass
+
+ self.assertEqual(stats.start, 1)
+ self.assertEqual(stats.elapsed, 3)
+ self.assertEqual(stats.level, 1)
+
+ def testtimeddecorator(self):
+ @util.timed
+ def testfunc(callcount=1):
+ callcount -= 1
+ if callcount:
+ testfunc(callcount)
+
+ # timestamps 1, 2, 3, 4, elapsed time of 3 - 2 = 1 and 4 - 1 = 3
+ with mocktimer(1, _start_default):
+ with capturestderr() as out:
+ testfunc(2)
+
+ self.assertEqual(out.getvalue(), (
+ b' testfunc: 1.000 s\n'
+ b' testfunc: 3.000 s\n'
+ ))
+
+if __name__ == '__main__':
+ import silenttestrunner
+ silenttestrunner.main(__name__)