Mercurial > hg
view tests/test-wireproto.py @ 46083:81c1f5d1801f
procutils: don't try to get `.buffer` if sys.stdin is None
While hunting down following test failure of test-chg.t on Python 3, I stumbled
the case when `.buffer` is not available as sys.stdin is None.
--- /home/pulkit/repo/hg-committed/tests/test-chg.t
+++ /home/pulkit/repo/hg-committed/tests/test-chg.t.err
@@ -203,7 +203,31 @@
$ CHGDEBUG=1 chg version -q 0<&-
chg: debug: * stdio fds are missing (glob)
chg: debug: * execute original hg (glob)
- Mercurial Distributed SCM * (glob)
+ Traceback (most recent call last):
+ File "/tmp/hgtests.avspvsq4/install/bin/hg", line 43, in <module>
+ dispatch.run()
+ File "/usr/lib/python3.6/importlib/util.py", line 233, in
__getattribute__
+ self.__spec__.loader.exec_module(self)
+ File "<frozen importlib._bootstrap_external>", line 678, in
exec_module
+ File "<frozen importlib._bootstrap>", line 219, in
_call_with_frames_removed
+ File
"/tmp/hgtests.avspvsq4/install/lib/python/mercurial/dispatch.py", line
726, in <module>
+ class lazyaliasentry(object):
+ File
"/tmp/hgtests.avspvsq4/install/lib/python/mercurial/dispatch.py", line
737, in lazyaliasentry
+ @util.propertycache
+ File "/usr/lib/python3.6/importlib/util.py", line 233, in
__getattribute__
+ self.__spec__.loader.exec_module(self)
+ File "<frozen importlib._bootstrap_external>", line 678, in
exec_module
+ File "<frozen importlib._bootstrap>", line 219, in
_call_with_frames_removed
+ File "/tmp/hgtests.avspvsq4/install/lib/python/mercurial/util.py",
line 3473, in <module>
+ f=procutil.stderr,
+ File "/usr/lib/python3.6/importlib/util.py", line 233, in
__getattribute__
+ self.__spec__.loader.exec_module(self)
+ File "<frozen importlib._bootstrap_external>", line 678, in
exec_module
+ File "<frozen importlib._bootstrap>", line 219, in
_call_with_frames_removed
+ File
"/tmp/hgtests.avspvsq4/install/lib/python/mercurial/utils/procutil.py",
line 127, in <module>
+ stdin = sys.stdin.buffer
+ AttributeError: 'NoneType' object has no attribute 'buffer'
+ [1]
server lifecycle
----------------
Differential Revision: https://phab.mercurial-scm.org/D9500
author | Pulkit Goyal <7895pulkit@gmail.com> |
---|---|
date | Wed, 02 Dec 2020 13:55:17 +0530 |
parents | 2372284d9457 |
children | c424ff4807e6 |
line wrap: on
line source
from __future__ import absolute_import, print_function import sys from mercurial import ( error, pycompat, ui as uimod, util, wireprototypes, wireprotov1peer, wireprotov1server, ) from mercurial.utils import stringutil stringio = util.stringio class proto(object): def __init__(self, args): self.args = args self.name = 'dummyproto' def getargs(self, spec): args = self.args args.setdefault(b'*', {}) names = spec.split() return [args[n] for n in names] def checkperm(self, perm): pass wireprototypes.TRANSPORTS['dummyproto'] = { 'transport': 'dummy', 'version': 1, } class clientpeer(wireprotov1peer.wirepeer): def __init__(self, serverrepo, ui): self.serverrepo = serverrepo self.ui = ui def url(self): return b'test' def local(self): return None def peer(self): return self def canpush(self): return True def close(self): pass def capabilities(self): return [b'batch'] def _call(self, cmd, **args): args = pycompat.byteskwargs(args) res = wireprotov1server.dispatch(self.serverrepo, proto(args), cmd) if isinstance(res, wireprototypes.bytesresponse): return res.data elif isinstance(res, bytes): return res else: raise error.Abort('dummy client does not support response type') def _callstream(self, cmd, **args): return stringio(self._call(cmd, **args)) @wireprotov1peer.batchable def greet(self, name): f = wireprotov1peer.future() yield {b'name': mangle(name)}, f yield unmangle(f.value) class serverrepo(object): def __init__(self, ui): self.ui = ui def greet(self, name): return b"Hello, " + name def filtered(self, name): return self def mangle(s): return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s)) def unmangle(s): return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s)) def greet(repo, proto, name): return mangle(repo.greet(unmangle(name))) wireprotov1server.commands[b'greet'] = (greet, b'name') srv = serverrepo(uimod.ui()) clt = clientpeer(srv, uimod.ui()) def printb(data, end=b'\n'): out = getattr(sys.stdout, 'buffer', sys.stdout) out.write(data + end) out.flush() printb(clt.greet(b"Foobar")) with clt.commandexecutor() as e: fgreet1 = e.callcommand(b'greet', {b'name': b'Fo, =;:<o'}) fgreet2 = e.callcommand(b'greet', {b'name': b'Bar'}) printb( stringutil.pprint([f.result() for f in (fgreet1, fgreet2)], bprefix=True) )