wireproto: split streamres into legacy and modern case
A couple of commands currently require transmission of uncompressed
frames with the old MIME type. Split this case from streamres into
a new streamres_legacy class. Streamline the remaining code accordingly.
Add a new flag to streamres to request uncompressed streams. This is
useful for sending data that is already compressed like a pre-built
bundle. Expect clients to support uncompressed data. For older clients,
zlib will still be used.
Differential Revision: https://phab.mercurial-scm.org/D1862
--- a/hgext/largefiles/proto.py Fri Jan 19 12:33:03 2018 -0800
+++ b/hgext/largefiles/proto.py Fri Jan 12 10:59:58 2018 +0100
@@ -75,7 +75,7 @@
yield '%d\n' % length
for chunk in util.filechunkiter(f):
yield chunk
- return wireproto.streamres(gen=generator())
+ return wireproto.streamres_legacy(gen=generator())
def statlfile(repo, proto, sha):
'''Server command for checking if a largefile is present - returns '2\n' if
--- a/mercurial/hgweb/protocol.py Fri Jan 19 12:33:03 2018 -0800
+++ b/mercurial/hgweb/protocol.py Fri Jan 12 10:59:58 2018 +0100
@@ -102,25 +102,20 @@
urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
urlreq.quote(self.req.env.get('REMOTE_USER', '')))
- def responsetype(self, v1compressible=False):
+ def responsetype(self, prefer_uncompressed):
"""Determine the appropriate response type and compression settings.
- The ``v1compressible`` argument states whether the response with
- application/mercurial-0.1 media types should be zlib compressed.
-
Returns a tuple of (mediatype, compengine, engineopts).
"""
- # For now, if it isn't compressible in the old world, it's never
- # compressible. We can change this to send uncompressed 0.2 payloads
- # later.
- if not v1compressible:
- return HGTYPE, None, None
-
# Determine the response media type and compression engine based
# on the request parameters.
protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
if '0.2' in protocaps:
+ # All clients are expected to support uncompressed data.
+ if prefer_uncompressed:
+ return HGTYPE2, util._noopengine(), {}
+
# Default as defined by wire protocol spec.
compformats = ['zlib', 'none']
for cap in protocaps:
@@ -155,7 +150,7 @@
def call(repo, req, cmd):
p = webproto(req, repo.ui)
- def genversion2(gen, compress, engine, engineopts):
+ def genversion2(gen, engine, engineopts):
# application/mercurial-0.2 always sends a payload header
# identifying the compression engine.
name = engine.wireprotosupport().name
@@ -163,28 +158,27 @@
yield struct.pack('B', len(name))
yield name
- if compress:
- for chunk in engine.compressstream(gen, opts=engineopts):
- yield chunk
- else:
- for chunk in gen:
- yield chunk
+ for chunk in gen:
+ yield chunk
rsp = wireproto.dispatch(repo, p, cmd)
if isinstance(rsp, bytes):
req.respond(HTTP_OK, HGTYPE, body=rsp)
return []
+ elif isinstance(rsp, wireproto.streamres_legacy):
+ gen = rsp.gen
+ req.respond(HTTP_OK, HGTYPE)
+ return gen
elif isinstance(rsp, wireproto.streamres):
gen = rsp.gen
# This code for compression should not be streamres specific. It
# is here because we only compress streamres at the moment.
- mediatype, engine, engineopts = p.responsetype(rsp.v1compressible)
+ mediatype, engine, engineopts = p.responsetype(rsp.prefer_uncompressed)
+ gen = engine.compressstream(gen, engineopts)
- if mediatype == HGTYPE and rsp.v1compressible:
- gen = engine.compressstream(gen, engineopts)
- elif mediatype == HGTYPE2:
- gen = genversion2(gen, rsp.v1compressible, engine, engineopts)
+ if mediatype == HGTYPE2:
+ gen = genversion2(gen, engine, engineopts)
req.respond(HTTP_OK, mediatype)
return gen
--- a/mercurial/sshserver.py Fri Jan 19 12:33:03 2018 -0800
+++ b/mercurial/sshserver.py Fri Jan 12 10:59:58 2018 +0100
@@ -105,6 +105,7 @@
handlers = {
str: sendresponse,
wireproto.streamres: sendstream,
+ wireproto.streamres_legacy: sendstream,
wireproto.pushres: sendpushresponse,
wireproto.pusherr: sendpusherror,
wireproto.ooberror: sendooberror,
--- a/mercurial/wireproto.py Fri Jan 19 12:33:03 2018 -0800
+++ b/mercurial/wireproto.py Fri Jan 12 10:59:58 2018 +0100
@@ -522,15 +522,26 @@
Accepts a generator containing chunks of data to be sent to the client.
- ``v1compressible`` indicates whether this data can be compressed to
- "version 1" clients (technically: HTTP peers using
- application/mercurial-0.1 media type). This flag should NOT be used on
- new commands because new clients should support a more modern compression
- mechanism.
+ ``prefer_uncompressed`` indicates that the data is expected to be
+ uncompressable and that the stream should therefore use the ``none``
+ engine.
"""
- def __init__(self, gen=None, v1compressible=False):
+ def __init__(self, gen=None, prefer_uncompressed=False):
self.gen = gen
- self.v1compressible = v1compressible
+ self.prefer_uncompressed = prefer_uncompressed
+
+class streamres_legacy(object):
+ """wireproto reply: uncompressed binary stream
+
+ The call was successful and the result is a stream.
+
+ Accepts a generator containing chunks of data to be sent to the client.
+
+ Like ``streamres``, but sends an uncompressed data for "version 1" clients
+ using the application/mercurial-0.1 media type.
+ """
+ def __init__(self, gen=None):
+ self.gen = gen
class pushres(object):
"""wireproto reply: success with simple integer return
@@ -802,7 +813,7 @@
missingheads=repo.heads())
cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
gen = iter(lambda: cg.read(32768), '')
- return streamres(gen=gen, v1compressible=True)
+ return streamres(gen=gen)
@wireprotocommand('changegroupsubset', 'bases heads')
def changegroupsubset(repo, proto, bases, heads):
@@ -812,7 +823,7 @@
missingheads=heads)
cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
gen = iter(lambda: cg.read(32768), '')
- return streamres(gen=gen, v1compressible=True)
+ return streamres(gen=gen)
@wireprotocommand('debugwireargs', 'one two *')
def debugwireargs(repo, proto, one, two, others):
@@ -877,8 +888,8 @@
advargs.append(('hint', exc.hint))
bundler.addpart(bundle2.bundlepart('error:abort',
manargs, advargs))
- return streamres(gen=bundler.getchunks(), v1compressible=True)
- return streamres(gen=chunks, v1compressible=True)
+ return streamres(gen=bundler.getchunks())
+ return streamres(gen=chunks)
@wireprotocommand('heads')
def heads(repo, proto):
@@ -955,7 +966,7 @@
capability with a value representing the version and flags of the repo
it is serving. Client checks to see if it understands the format.
'''
- return streamres(streamclone.generatev1wireproto(repo))
+ return streamres_legacy(streamclone.generatev1wireproto(repo))
@wireprotocommand('unbundle', 'heads')
def unbundle(repo, proto, heads):
@@ -990,7 +1001,7 @@
if util.safehasattr(r, 'addpart'):
# The return looks streamable, we are in the bundle2 case and
# should return a stream.
- return streamres(gen=r.getchunks())
+ return streamres_legacy(gen=r.getchunks())
return pushres(r)
finally:
@@ -1054,4 +1065,4 @@
manargs, advargs))
except error.PushRaced as exc:
bundler.newpart('error:pushraced', [('message', str(exc))])
- return streamres(gen=bundler.getchunks())
+ return streamres_legacy(gen=bundler.getchunks())