comparison mercurial/httppeer.py @ 30465:40a1871eea5e

httppeer: use compression engine API for decompressing responses In preparation for supporting multiple compression formats on the wire protocol, we need all users of the wire protocol to use compression engine APIs. This commit ports the HTTP wire protocol client to use the compression engine API. The code for handling the HTTPException is a bit hacky. Essentially, HTTPException could be thrown by any read() from the socket. However, as part of porting the API, we no longer have a generator wrapping the socket and we don't have a single place where we can trap the exception. We solve this by introducing a proxy class that intercepts read() and converts the exception appropriately. In the future, we could introduce a new compression engine API that supports emitting a generator of decompressed chunks. This would eliminate the need for the proxy class. As I said when I introduced the decompressorreader() API, I'm not fond of it and would support transitioning to something better. This can be done as a follow-up, preferably once all the code is using the compression engine API and we have a better idea of the API needs of all the consumers.
author Gregory Szorc <gregory.szorc@gmail.com>
date Sun, 20 Nov 2016 13:55:53 -0800
parents e16e234b9ca3
children ef9f197188ec
comparison
equal deleted inserted replaced
30464:e16e234b9ca3 30465:40a1871eea5e
10 10
11 import errno 11 import errno
12 import os 12 import os
13 import socket 13 import socket
14 import tempfile 14 import tempfile
15 import zlib
16 15
17 from .i18n import _ 16 from .i18n import _
18 from .node import nullid 17 from .node import nullid
19 from . import ( 18 from . import (
20 bundle2, 19 bundle2,
28 27
29 httplib = util.httplib 28 httplib = util.httplib
30 urlerr = util.urlerr 29 urlerr = util.urlerr
31 urlreq = util.urlreq 30 urlreq = util.urlreq
32 31
33 def zgenerator(f): 32 # FUTURE: consider refactoring this API to use generators. This will
34 zd = zlib.decompressobj() 33 # require a compression engine API to emit generators.
34 def decompressresponse(response, engine):
35 try: 35 try:
36 for chunk in util.filechunkiter(f): 36 reader = engine.decompressorreader(response)
37 while chunk:
38 yield zd.decompress(chunk, 2**18)
39 chunk = zd.unconsumed_tail
40 except httplib.HTTPException: 37 except httplib.HTTPException:
41 raise IOError(None, _('connection ended unexpectedly')) 38 raise IOError(None, _('connection ended unexpectedly'))
42 yield zd.flush() 39
40 # We need to wrap reader.read() so HTTPException on subsequent
41 # reads is also converted.
42 origread = reader.read
43 class readerproxy(reader.__class__):
44 def read(self, *args, **kwargs):
45 try:
46 return origread(*args, **kwargs)
47 except httplib.HTTPException:
48 raise IOError(None, _('connection ended unexpectedly'))
49
50 reader.__class__ = readerproxy
51 return reader
43 52
44 class httppeer(wireproto.wirepeer): 53 class httppeer(wireproto.wirepeer):
45 def __init__(self, ui, path): 54 def __init__(self, ui, path):
46 self.path = path 55 self.path = path
47 self.caps = None 56 self.caps = None
200 if version_info > (0, 1): 209 if version_info > (0, 1):
201 raise error.RepoError(_("'%s' uses newer protocol %s") % 210 raise error.RepoError(_("'%s' uses newer protocol %s") %
202 (safeurl, version)) 211 (safeurl, version))
203 212
204 if _compressible: 213 if _compressible:
205 return util.chunkbuffer(zgenerator(resp)) 214 return decompressresponse(resp, util.compengines['zlib'])
206 215
207 return resp 216 return resp
208 217
209 def _call(self, cmd, **args): 218 def _call(self, cmd, **args):
210 fp = self._callstream(cmd, **args) 219 fp = self._callstream(cmd, **args)