comparison mercurial/wireproto.py @ 30206:d105195436c0

wireproto: compress data from a generator Currently, the "getbundle" wire protocol command obtains a generator of data, converts it to a util.chunkbuffer, then converts it back to a generator via the protocol's groupchunks() implementation. For the SSH protocol, groupchunks() simply reads 4kb chunks then write()s the data to a file descriptor. For the HTTP protocol, groupchunks() reads 32kb chunks, feeds those into a zlib compressor, emits compressed data as it is available, and that is sent to the WSGI layer, where it is likely turned into HTTP chunked transfer chunks as is or further buffered and turned into a larger chunk. For both the SSH and HTTP protocols, there is inefficiency from using util.chunkbuffer. For SSH, emitting consistent 4kb chunks sounds nice. However, the file descriptor it is writing to is almost certainly buffered. That means that a Python .write() probably doesn't translate into exactly what is written to the I/O layer. For HTTP, we're going through an intermediate layer to zlib compress data. So all util.chunkbuffer is doing is ensuring that the chunks we feed into the zlib compressor are of uniform size. This means more CPU time in Python buffering and emitting chunks in util.chunkbuffer but fewer function calls to zlib. This patch introduces and implements a new wire protocol abstract method: compresschunks(). It is like groupchunks() except it operates on a generator instead of something with a .read(). The SSH implementation simply proxies chunks. The HTTP implementation uses zlib compression. To avoid duplicate code, the HTTP groupchunks() has been reimplemented in terms of compresschunks(). To prove this all works, the "getbundle" wire protocol command has been switched to compresschunks(). This removes the util.chunkbuffer from that command. Now, data essentially streams straight from the changegroup emitter to the wire, possibly through a zlib compressor. Generators all the way, baby. There were slim to no performance changes on the server as measured with the mozilla-central repository. This is likely because CPU time is dominated by reading revlogs, producing the changegroup, and zlib compressing the output stream. Still, this brings us a little closer to our ideal of using generators everywhere.
author Gregory Szorc <gregory.szorc@gmail.com>
date Sun, 16 Oct 2016 11:10:21 -0700
parents 3e86261bf110
children 2add671bf55b
comparison
equal deleted inserted replaced
30205:b4074417b661 30206:d105195436c0
80 80
81 def groupchunks(self, fh): 81 def groupchunks(self, fh):
82 """Generator of chunks to send to the client. 82 """Generator of chunks to send to the client.
83 83
84 Some protocols may have compressed the contents. 84 Some protocols may have compressed the contents.
85 """
86 raise NotImplementedError()
87
88 def compresschunks(self, chunks):
89 """Generator of possible compressed chunks to send to the client.
90
91 This is like ``groupchunks()`` except it accepts a generator as
92 its argument.
85 """ 93 """
86 raise NotImplementedError() 94 raise NotImplementedError()
87 95
88 class remotebatch(peer.batcher): 96 class remotebatch(peer.batcher):
89 '''batches the queued calls; uses as few roundtrips as possible''' 97 '''batches the queued calls; uses as few roundtrips as possible'''
771 if not bundle1allowed(repo, 'pull'): 779 if not bundle1allowed(repo, 'pull'):
772 if not exchange.bundle2requested(opts.get('bundlecaps')): 780 if not exchange.bundle2requested(opts.get('bundlecaps')):
773 return ooberror(bundle2required) 781 return ooberror(bundle2required)
774 782
775 chunks = exchange.getbundlechunks(repo, 'serve', **opts) 783 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
776 # TODO avoid util.chunkbuffer() here since it is adding overhead to 784 return streamres(proto.compresschunks(chunks))
777 # what is fundamentally a generator proxying operation.
778 return streamres(proto.groupchunks(util.chunkbuffer(chunks)))
779 785
780 @wireprotocommand('heads') 786 @wireprotocommand('heads')
781 def heads(repo, proto): 787 def heads(repo, proto):
782 h = repo.heads() 788 h = repo.heads()
783 return encodelist(h) + "\n" 789 return encodelist(h) + "\n"