Mercurial > hg
view mercurial/wireprotoserver.py @ 46582:b0a3ca02d17a
copies-rust: implement PartialEqual manually
Now that we know that each (dest, rev) pair has at most a unique CopySource, we
can simplify comparison a lot.
This "simple" step buy a good share of the previous slowdown back in some case:
Repo Case Source-Rev Dest-Rev # of revisions old time new time Difference Factor time per rev
---------------------------------------------------------------------------------------------------------------------------------------------------------------
mozilla-try x00000_revs_x00000_added_x000_copies 9b2a99adc05e 8e29777b48e6 : 382065 revs, 43.304637 s, 34.443661 s, -8.860976 s, × 0.7954, 90 µs/rev
Full benchmark:
Repo Case Source-Rev Dest-Rev # of revisions old time new time Difference Factor time per rev
---------------------------------------------------------------------------------------------------------------------------------------------------------------
mercurial x_revs_x_added_0_copies ad6b123de1c7 39cfcef4f463 : 1 revs, 0.000043 s, 0.000043 s, +0.000000 s, × 1.0000, 43 µs/rev
mercurial x_revs_x_added_x_copies 2b1c78674230 0c1d10351869 : 6 revs, 0.000114 s, 0.000117 s, +0.000003 s, × 1.0263, 19 µs/rev
mercurial x000_revs_x000_added_x_copies 81f8ff2a9bf2 dd3267698d84 : 1032 revs, 0.004937 s, 0.004892 s, -0.000045 s, × 0.9909, 4 µs/rev
pypy x_revs_x_added_0_copies aed021ee8ae8 099ed31b181b : 9 revs, 0.000339 s, 0.000196 s, -0.000143 s, × 0.5782, 21 µs/rev
pypy x_revs_x000_added_0_copies 4aa4e1f8e19a 359343b9ac0e : 1 revs, 0.000049 s, 0.000050 s, +0.000001 s, × 1.0204, 50 µs/rev
pypy x_revs_x_added_x_copies ac52eb7bbbb0 72e022663155 : 7 revs, 0.000202 s, 0.000117 s, -0.000085 s, × 0.5792, 16 µs/rev
pypy x_revs_x00_added_x_copies c3b14617fbd7 ace7255d9a26 : 1 revs, 0.000409 s, 0.6f1f4a s, -0.000087 s, × 0.7873, 322 µs/rev
pypy x_revs_x000_added_x000_copies df6f7a526b60 a83dc6a2d56f : 6 revs, 0.011984 s, 0.011949 s, -0.000035 s, × 0.9971, 1991 µs/rev
pypy x000_revs_xx00_added_0_copies 89a76aede314 2f22446ff07e : 4785 revs, 0.050820 s, 0.050802 s, -0.000018 s, × 0.9996, 10 µs/rev
pypy x000_revs_x000_added_x_copies 8a3b5bfd266e 2c68e87c3efe : 6780 revs, 0.087953 s, 0.088090 s, +0.000137 s, × 1.0016, 12 µs/rev
pypy x000_revs_x000_added_x000_copies 89a76aede314 7b3dda341c84 : 5441 revs, 0.062902 s, 0.062079 s, -0.000823 s, × 0.9869, 11 µs/rev
pypy x0000_revs_x_added_0_copies d1defd0dc478 c9cb1334cc78 : 43645 revs, 0.679234 s, 0.635337 s, -0.043897 s, × 0.9354, 14 µs/rev
pypy x0000_revs_xx000_added_0_copies bf2c629d0071 4ffed77c095c : 2 revs, 0.013095 s, 0.013262 s, +0.000167 s, × 1.0128, 6631 µs/rev
pypy x0000_revs_xx000_added_x000_copies 08ea3258278e d9fa043f30c0 : 11316 revs, 0.120910 s, 0.120085 s, -0.000825 s, × 0.9932, 10 µs/rev
netbeans x_revs_x_added_0_copies fb0955ffcbcd a01e9239f9e7 : 2 revs, 0.000087 s, 0.000085 s, -0.000002 s, × 0.9770, 42 µs/rev
netbeans x_revs_x000_added_0_copies 6f360122949f 20eb231cc7d0 : 2 revs, 0.000107 s, 0.000110 s, +0.000003 s, × 1.0280, 55 µs/rev
netbeans x_revs_x_added_x_copies 1ada3faf6fb6 5a39d12eecf4 : 3 revs, 0.000186 s, 0.000177 s, -0.000009 s, × 0.9516, 59 µs/rev
netbeans x_revs_x00_added_x_copies 35be93ba1e2c 9eec5e90c05f : 9 revs, 0.000754 s, 0.000743 s, -0.000011 s, × 0.9854, 82 µs/rev
netbeans x000_revs_xx00_added_0_copies eac3045b4fdd 51d4ae7f1290 : 1421 revs, 0.010443 s, 0.010168 s, -0.000275 s, × 0.9737, 7 µs/rev
netbeans x000_revs_x000_added_x_copies e2063d266acd 6081d72689dc : 1533 revs, 0.015697 s, 0.015946 s, +0.000249 s, × 1.0159, 10 µs/rev
netbeans x000_revs_x000_added_x000_copies ff453e9fee32 411350406ec2 : 5750 revs, 0.063528 s, 0.062712 s, -0.000816 s, × 0.9872, 10 µs/rev
netbeans x0000_revs_xx000_added_x000_copies 588c2d1ced70 1aad62e59ddd : 66949 revs, 0.545515 s, 0.523832 s, -0.021683 s, × 0.9603, 7 µs/rev
mozilla-central x_revs_x_added_0_copies 3697f962bb7b 7015fcdd43a2 : 2 revs, 0.000089 s, 0.000090 s, +0.000001 s, × 1.0112, 45 µs/rev
mozilla-central x_revs_x000_added_0_copies dd390860c6c9 40d0c5bed75d : 8 revs, 0.000265 s, 0.000264 s, -0.000001 s, × 0.9962, 33 µs/rev
mozilla-central x_revs_x_added_x_copies 8d198483ae3b 14207ffc2b2f : 9 revs, 0.000381 s, 0.000187 s, -0.000194 s, × 0.4908, 20 µs/rev
mozilla-central x_revs_x00_added_x_copies 98cbc58cc6bc 446a150332c3 : 7 revs, 0.000672 s, 0.000665 s, -0.000007 s, × 0.9896, 95 µs/rev
mozilla-central x_revs_x000_added_x000_copies 3c684b4b8f68 0a5e72d1b479 : 3 revs, 0.003497 s, 0.003556 s, +0.000059 s, × 1.0169, 1185 µs/rev
mozilla-central x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 6 revs, 0.073204 s, 0.071345 s, -0.001859 s, × 0.9746, 11890 µs/rev
mozilla-central x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 1593 revs, 0.006482 s, 0.006551 s, +0.000069 s, × 1.0106, 4 µs/rev
mozilla-central x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 41 revs, 0.005066 s, 0.005078 s, +0.000012 s, × 1.0024, 123 µs/rev
mozilla-central x000_revs_x000_added_x000_copies 7c97034feb78 4407bd0c6330 : 7839 revs, 0.065707 s, 0.065823 s, +0.000116 s, × 1.0018, 8 µs/rev
mozilla-central x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 615 revs, 0.026800 s, 0.027050 s, +0.000250 s, × 1.0093, 43 µs/rev
mozilla-central x0000_revs_xx000_added_x000_copies f78c615a656c 96a38b690156 : 30263 revs, 0.203856 s, 0.202443 s, -0.001413 s, × 0.9931, 6 µs/rev
mozilla-central x00000_revs_x0000_added_x0000_copies 6832ae71433c 4c222a1d9a00 : 153721 revs, 1.293394 s, 1.261583 s, -0.031811 s, × 0.9754, 8 µs/rev
mozilla-central x00000_revs_x00000_added_x000_copies 76caed42cf7c 1daa622bbe42 : 204976 revs, 1.698239 s, 1.643869 s, -0.054370 s, × 0.9680, 8 µs/rev
mozilla-try x_revs_x_added_0_copies aaf6dde0deb8 9790f499805a : 2 revs, 0.000875 s, 0.000868 s, -0.000007 s, × 0.9920, 434 µs/rev
mozilla-try x_revs_x000_added_0_copies d8d0222927b4 5bb8ce8c7450 : 2 revs, 0.000891 s, 0.000887 s, -0.000004 s, × 0.9955, 443 µs/rev
mozilla-try x_revs_x_added_x_copies 092fcca11bdb 936255a0384a : 4 revs, 0.000292 s, 0.000168 s, -0.000124 s, × 0.5753, 42 µs/rev
mozilla-try x_revs_x00_added_x_copies b53d2fadbdb5 017afae788ec : 2 revs, 0.003939 s, 0.001160 s, -0.002779 s, × 0.2945, 580 µs/rev
mozilla-try x_revs_x000_added_x000_copies 20408ad61ce5 6f0ee96e21ad : 1 revs, 0.033027 s, 0.033016 s, -0.000011 s, × 0.9997, 33016 µs/rev
mozilla-try x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 6 revs, 0.073703 s, 0.073312 s, -0.39ae31 s, × 0.9947, 12218 µs/rev
mozilla-try x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 1593 revs, 0.006469 s, 0.006485 s, +0.000016 s, × 1.0025, 4 µs/rev
mozilla-try x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 41 revs, 0.005278 s, 0.005494 s, +0.000216 s, × 1.0409, 134 µs/rev
mozilla-try x000_revs_x000_added_x000_copies 1346fd0130e4 4c65cbdabc1f : 6657 revs, 0.064995 s, 0.064879 s, -0.000116 s, × 0.9982, 9 µs/rev
mozilla-try x0000_revs_x_added_0_copies 63519bfd42ee a36a2a865d92 : 40314 revs, 0.301041 s, 0.301469 s, +0.000428 s, × 1.0014, 7 µs/rev
mozilla-try x0000_revs_x_added_x_copies 9fe69ff0762d bcabf2a78927 : 38690 revs, 0.285575 s, 0.297113 s, +0.011538 s, × 1.0404, 7 µs/rev
mozilla-try x0000_revs_xx000_added_x_copies 156f6e2674f2 4d0f2c178e66 : 8598 revs, 0.085597 s, 0.085890 s, +0.000293 s, × 1.0034, 9 µs/rev
mozilla-try x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 615 revs, 0.027118 s, 0.027718 s, +0.000600 s, × 1.0221, 45 µs/rev
mozilla-try x0000_revs_xx000_added_x000_copies 89294cd501d9 7ccb2fc7ccb5 : 97052 revs, 2.119204 s, 2.048949 s, -0.070255 s, × 0.9668, 21 µs/rev
mozilla-try x0000_revs_x0000_added_x0000_copies e928c65095ed e951f4ad123a : 52031 revs, 0.701479 s, 0.685924 s, -0.015555 s, × 0.9778, 13 µs/rev
mozilla-try x00000_revs_x_added_0_copies 6a320851d377 1ebb79acd503 : 363753 revs, 4.482399 s, 4.482891 s, +0.000492 s, × 1.0001, 12 µs/rev
mozilla-try x00000_revs_x00000_added_0_copies dc8a3ca7010e d16fde900c9c : 34414 revs, 0.574082 s, 0.577633 s, +0.003551 s, × 1.0062, 16 µs/rev
mozilla-try x00000_revs_x_added_x_copies 5173c4b6f97c 95d83ee7242d : 362229 revs, 4.480366 s, 4.397816 s, -0.082550 s, × 0.9816, 12 µs/rev
mozilla-try x00000_revs_x000_added_x_copies 9126823d0e9c ca82787bb23c : 359344 revs, 4.369070 s, 4.370538 s, +0.001468 s, × 1.0003, 12 µs/rev
mozilla-try x00000_revs_x0000_added_x0000_copies 8d3fafa80d4b eb884023b810 : 192665 revs, 1.592506 s, 1.570439 s, -0.022067 s, × 0.9861, 8 µs/rev
mozilla-try x00000_revs_x00000_added_x0000_copies 1b661134e2ca 1ae03d022d6d : 228985 revs, 87.824489 s, 88.388512 s, +0.564023 s, × 1.0064, 386 µs/rev
mozilla-try x00000_revs_x00000_added_x000_copies 9b2a99adc05e 8e29777b48e6 : 382065 revs, 43.304637 s, 34.443661 s, -8.860976 s, × 0.7954, 90 µs/rev
private : 459513 revs, 33.853687 s, 27.370148 s, -6.483539 s, × 0.8085, 59 µs/rev
Differential Revision: https://phab.mercurial-scm.org/D9653
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Wed, 16 Dec 2020 11:11:05 +0100 |
parents | b7b8a1538161 |
children | 71443f742886 |
line wrap: on
line source
# Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net> # Copyright 2005-2007 Matt Mackall <mpm@selenic.com> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import import contextlib import struct import threading from .i18n import _ from . import ( encoding, error, pycompat, util, wireprototypes, wireprotov1server, wireprotov2server, ) from .interfaces import util as interfaceutil from .utils import ( cborutil, compression, ) stringio = util.stringio urlerr = util.urlerr urlreq = util.urlreq HTTP_OK = 200 HGTYPE = b'application/mercurial-0.1' HGTYPE2 = b'application/mercurial-0.2' HGERRTYPE = b'application/hg-error' SSHV1 = wireprototypes.SSHV1 SSHV2 = wireprototypes.SSHV2 def decodevaluefromheaders(req, headerprefix): """Decode a long value from multiple HTTP request headers. Returns the value as a bytes, not a str. """ chunks = [] i = 1 while True: v = req.headers.get(b'%s-%d' % (headerprefix, i)) if v is None: break chunks.append(pycompat.bytesurl(v)) i += 1 return b''.join(chunks) @interfaceutil.implementer(wireprototypes.baseprotocolhandler) class httpv1protocolhandler(object): def __init__(self, req, ui, checkperm): self._req = req self._ui = ui self._checkperm = checkperm self._protocaps = None @property def name(self): return b'http-v1' def getargs(self, args): knownargs = self._args() data = {} keys = args.split() for k in keys: if k == b'*': star = {} for key in knownargs.keys(): if key != b'cmd' and key not in keys: star[key] = knownargs[key][0] data[b'*'] = star else: data[k] = knownargs[k][0] return [data[k] for k in keys] def _args(self): args = self._req.qsparams.asdictoflists() postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0)) if postlen: args.update( urlreq.parseqs( self._req.bodyfh.read(postlen), keep_blank_values=True ) ) return args argvalue = decodevaluefromheaders(self._req, b'X-HgArg') args.update(urlreq.parseqs(argvalue, keep_blank_values=True)) return args def getprotocaps(self): if self._protocaps is None: value = decodevaluefromheaders(self._req, b'X-HgProto') self._protocaps = set(value.split(b' ')) return self._protocaps def getpayload(self): # Existing clients *always* send Content-Length. length = int(self._req.headers[b'Content-Length']) # If httppostargs is used, we need to read Content-Length # minus the amount that was consumed by args. length -= int(self._req.headers.get(b'X-HgArgs-Post', 0)) return util.filechunkiter(self._req.bodyfh, limit=length) @contextlib.contextmanager def mayberedirectstdio(self): oldout = self._ui.fout olderr = self._ui.ferr out = util.stringio() try: self._ui.fout = out self._ui.ferr = out yield out finally: self._ui.fout = oldout self._ui.ferr = olderr def client(self): return b'remote:%s:%s:%s' % ( self._req.urlscheme, urlreq.quote(self._req.remotehost or b''), urlreq.quote(self._req.remoteuser or b''), ) def addcapabilities(self, repo, caps): caps.append(b'batch') caps.append( b'httpheader=%d' % repo.ui.configint(b'server', b'maxhttpheaderlen') ) if repo.ui.configbool(b'experimental', b'httppostargs'): caps.append(b'httppostargs') # FUTURE advertise 0.2rx once support is implemented # FUTURE advertise minrx and mintx after consulting config option caps.append(b'httpmediatype=0.1rx,0.1tx,0.2tx') compengines = wireprototypes.supportedcompengines( repo.ui, compression.SERVERROLE ) if compengines: comptypes = b','.join( urlreq.quote(e.wireprotosupport().name) for e in compengines ) caps.append(b'compression=%s' % comptypes) return caps def checkperm(self, perm): return self._checkperm(perm) # This method exists mostly so that extensions like remotefilelog can # disable a kludgey legacy method only over http. As of early 2018, # there are no other known users, so with any luck we can discard this # hook if remotefilelog becomes a first-party extension. def iscmd(cmd): return cmd in wireprotov1server.commands def handlewsgirequest(rctx, req, res, checkperm): """Possibly process a wire protocol request. If the current request is a wire protocol request, the request is processed by this function. ``req`` is a ``parsedrequest`` instance. ``res`` is a ``wsgiresponse`` instance. Returns a bool indicating if the request was serviced. If set, the caller should stop processing the request, as a response has already been issued. """ # Avoid cycle involving hg module. from .hgweb import common as hgwebcommon repo = rctx.repo # HTTP version 1 wire protocol requests are denoted by a "cmd" query # string parameter. If it isn't present, this isn't a wire protocol # request. if b'cmd' not in req.qsparams: return False cmd = req.qsparams[b'cmd'] # The "cmd" request parameter is used by both the wire protocol and hgweb. # While not all wire protocol commands are available for all transports, # if we see a "cmd" value that resembles a known wire protocol command, we # route it to a protocol handler. This is better than routing possible # wire protocol requests to hgweb because it prevents hgweb from using # known wire protocol commands and it is less confusing for machine # clients. if not iscmd(cmd): return False # The "cmd" query string argument is only valid on the root path of the # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request # in this case. We send an HTTP 404 for backwards compatibility reasons. if req.dispatchpath: res.status = hgwebcommon.statusmessage(404) res.headers[b'Content-Type'] = HGTYPE # TODO This is not a good response to issue for this request. This # is mostly for BC for now. res.setbodybytes(b'0\n%s\n' % b'Not Found') return True proto = httpv1protocolhandler( req, repo.ui, lambda perm: checkperm(rctx, req, perm) ) # The permissions checker should be the only thing that can raise an # ErrorResponse. It is kind of a layer violation to catch an hgweb # exception here. So consider refactoring into a exception type that # is associated with the wire protocol. try: _callhttp(repo, req, res, proto, cmd) except hgwebcommon.ErrorResponse as e: for k, v in e.headers: res.headers[k] = v res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e)) # TODO This response body assumes the failed command was # "unbundle." That assumption is not always valid. res.setbodybytes(b'0\n%s\n' % pycompat.bytestr(e)) return True def _availableapis(repo): apis = set() # Registered APIs are made available via config options of the name of # the protocol. for k, v in API_HANDLERS.items(): section, option = v[b'config'] if repo.ui.configbool(section, option): apis.add(k) return apis def handlewsgiapirequest(rctx, req, res, checkperm): """Handle requests to /api/*.""" assert req.dispatchparts[0] == b'api' repo = rctx.repo # This whole URL space is experimental for now. But we want to # reserve the URL space. So, 404 all URLs if the feature isn't enabled. if not repo.ui.configbool(b'experimental', b'web.apiserver'): res.status = b'404 Not Found' res.headers[b'Content-Type'] = b'text/plain' res.setbodybytes(_(b'Experimental API server endpoint not enabled')) return # The URL space is /api/<protocol>/*. The structure of URLs under varies # by <protocol>. availableapis = _availableapis(repo) # Requests to /api/ list available APIs. if req.dispatchparts == [b'api']: res.status = b'200 OK' res.headers[b'Content-Type'] = b'text/plain' lines = [ _( b'APIs can be accessed at /api/<name>, where <name> can be ' b'one of the following:\n' ) ] if availableapis: lines.extend(sorted(availableapis)) else: lines.append(_(b'(no available APIs)\n')) res.setbodybytes(b'\n'.join(lines)) return proto = req.dispatchparts[1] if proto not in API_HANDLERS: res.status = b'404 Not Found' res.headers[b'Content-Type'] = b'text/plain' res.setbodybytes( _(b'Unknown API: %s\nKnown APIs: %s') % (proto, b', '.join(sorted(availableapis))) ) return if proto not in availableapis: res.status = b'404 Not Found' res.headers[b'Content-Type'] = b'text/plain' res.setbodybytes(_(b'API %s not enabled\n') % proto) return API_HANDLERS[proto][b'handler']( rctx, req, res, checkperm, req.dispatchparts[2:] ) # Maps API name to metadata so custom API can be registered. # Keys are: # # config # Config option that controls whether service is enabled. # handler # Callable receiving (rctx, req, res, checkperm, urlparts) that is called # when a request to this API is received. # apidescriptor # Callable receiving (req, repo) that is called to obtain an API # descriptor for this service. The response must be serializable to CBOR. API_HANDLERS = { wireprotov2server.HTTP_WIREPROTO_V2: { b'config': (b'experimental', b'web.api.http-v2'), b'handler': wireprotov2server.handlehttpv2request, b'apidescriptor': wireprotov2server.httpv2apidescriptor, }, } def _httpresponsetype(ui, proto, prefer_uncompressed): """Determine the appropriate response type and compression settings. Returns a tuple of (mediatype, compengine, engineopts). """ # Determine the response media type and compression engine based # on the request parameters. if b'0.2' in proto.getprotocaps(): # All clients are expected to support uncompressed data. if prefer_uncompressed: return HGTYPE2, compression._noopengine(), {} # Now find an agreed upon compression format. compformats = wireprotov1server.clientcompressionsupport(proto) for engine in wireprototypes.supportedcompengines( ui, compression.SERVERROLE ): if engine.wireprotosupport().name in compformats: opts = {} level = ui.configint(b'server', b'%slevel' % engine.name()) if level is not None: opts[b'level'] = level return HGTYPE2, engine, opts # No mutually supported compression format. Fall back to the # legacy protocol. # Don't allow untrusted settings because disabling compression or # setting a very high compression level could lead to flooding # the server's network or CPU. opts = {b'level': ui.configint(b'server', b'zliblevel')} return HGTYPE, util.compengines[b'zlib'], opts def processcapabilitieshandshake(repo, req, res, proto): """Called during a ?cmd=capabilities request. If the client is advertising support for a newer protocol, we send a CBOR response with information about available services. If no advertised services are available, we don't handle the request. """ # Fall back to old behavior unless the API server is enabled. if not repo.ui.configbool(b'experimental', b'web.apiserver'): return False clientapis = decodevaluefromheaders(req, b'X-HgUpgrade') protocaps = decodevaluefromheaders(req, b'X-HgProto') if not clientapis or not protocaps: return False # We currently only support CBOR responses. protocaps = set(protocaps.split(b' ')) if b'cbor' not in protocaps: return False descriptors = {} for api in sorted(set(clientapis.split()) & _availableapis(repo)): handler = API_HANDLERS[api] descriptorfn = handler.get(b'apidescriptor') if not descriptorfn: continue descriptors[api] = descriptorfn(req, repo) v1caps = wireprotov1server.dispatch(repo, proto, b'capabilities') assert isinstance(v1caps, wireprototypes.bytesresponse) m = { # TODO allow this to be configurable. b'apibase': b'api/', b'apis': descriptors, b'v1capabilities': v1caps.data, } res.status = b'200 OK' res.headers[b'Content-Type'] = b'application/mercurial-cbor' res.setbodybytes(b''.join(cborutil.streamencode(m))) return True def _callhttp(repo, req, res, proto, cmd): # Avoid cycle involving hg module. from .hgweb import common as hgwebcommon def genversion2(gen, engine, engineopts): # application/mercurial-0.2 always sends a payload header # identifying the compression engine. name = engine.wireprotosupport().name assert 0 < len(name) < 256 yield struct.pack(b'B', len(name)) yield name for chunk in gen: yield chunk def setresponse(code, contenttype, bodybytes=None, bodygen=None): if code == HTTP_OK: res.status = b'200 Script output follows' else: res.status = hgwebcommon.statusmessage(code) res.headers[b'Content-Type'] = contenttype if bodybytes is not None: res.setbodybytes(bodybytes) if bodygen is not None: res.setbodygen(bodygen) if not wireprotov1server.commands.commandavailable(cmd, proto): setresponse( HTTP_OK, HGERRTYPE, _( b'requested wire protocol command is not available over ' b'HTTP' ), ) return proto.checkperm(wireprotov1server.commands[cmd].permission) # Possibly handle a modern client wanting to switch protocols. if cmd == b'capabilities' and processcapabilitieshandshake( repo, req, res, proto ): return rsp = wireprotov1server.dispatch(repo, proto, cmd) if isinstance(rsp, bytes): setresponse(HTTP_OK, HGTYPE, bodybytes=rsp) elif isinstance(rsp, wireprototypes.bytesresponse): setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data) elif isinstance(rsp, wireprototypes.streamreslegacy): setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen) elif isinstance(rsp, wireprototypes.streamres): gen = rsp.gen # This code for compression should not be streamres specific. It # is here because we only compress streamres at the moment. mediatype, engine, engineopts = _httpresponsetype( repo.ui, proto, rsp.prefer_uncompressed ) gen = engine.compressstream(gen, engineopts) if mediatype == HGTYPE2: gen = genversion2(gen, engine, engineopts) setresponse(HTTP_OK, mediatype, bodygen=gen) elif isinstance(rsp, wireprototypes.pushres): rsp = b'%d\n%s' % (rsp.res, rsp.output) setresponse(HTTP_OK, HGTYPE, bodybytes=rsp) elif isinstance(rsp, wireprototypes.pusherr): rsp = b'0\n%s\n' % rsp.res res.drain = True setresponse(HTTP_OK, HGTYPE, bodybytes=rsp) elif isinstance(rsp, wireprototypes.ooberror): setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message) else: raise error.ProgrammingError(b'hgweb.protocol internal failure', rsp) def _sshv1respondbytes(fout, value): """Send a bytes response for protocol version 1.""" fout.write(b'%d\n' % len(value)) fout.write(value) fout.flush() def _sshv1respondstream(fout, source): write = fout.write for chunk in source.gen: write(chunk) fout.flush() def _sshv1respondooberror(fout, ferr, rsp): ferr.write(b'%s\n-\n' % rsp) ferr.flush() fout.write(b'\n') fout.flush() @interfaceutil.implementer(wireprototypes.baseprotocolhandler) class sshv1protocolhandler(object): """Handler for requests services via version 1 of SSH protocol.""" def __init__(self, ui, fin, fout): self._ui = ui self._fin = fin self._fout = fout self._protocaps = set() @property def name(self): return wireprototypes.SSHV1 def getargs(self, args): data = {} keys = args.split() for n in pycompat.xrange(len(keys)): argline = self._fin.readline()[:-1] arg, l = argline.split() if arg not in keys: raise error.Abort(_(b"unexpected parameter %r") % arg) if arg == b'*': star = {} for k in pycompat.xrange(int(l)): argline = self._fin.readline()[:-1] arg, l = argline.split() val = self._fin.read(int(l)) star[arg] = val data[b'*'] = star else: val = self._fin.read(int(l)) data[arg] = val return [data[k] for k in keys] def getprotocaps(self): return self._protocaps def getpayload(self): # We initially send an empty response. This tells the client it is # OK to start sending data. If a client sees any other response, it # interprets it as an error. _sshv1respondbytes(self._fout, b'') # The file is in the form: # # <chunk size>\n<chunk> # ... # 0\n count = int(self._fin.readline()) while count: yield self._fin.read(count) count = int(self._fin.readline()) @contextlib.contextmanager def mayberedirectstdio(self): yield None def client(self): client = encoding.environ.get(b'SSH_CLIENT', b'').split(b' ', 1)[0] return b'remote:ssh:' + client def addcapabilities(self, repo, caps): if self.name == wireprototypes.SSHV1: caps.append(b'protocaps') caps.append(b'batch') return caps def checkperm(self, perm): pass class sshv2protocolhandler(sshv1protocolhandler): """Protocol handler for version 2 of the SSH protocol.""" @property def name(self): return wireprototypes.SSHV2 def addcapabilities(self, repo, caps): return caps def _runsshserver(ui, repo, fin, fout, ev): # This function operates like a state machine of sorts. The following # states are defined: # # protov1-serving # Server is in protocol version 1 serving mode. Commands arrive on # new lines. These commands are processed in this state, one command # after the other. # # protov2-serving # Server is in protocol version 2 serving mode. # # upgrade-initial # The server is going to process an upgrade request. # # upgrade-v2-filter-legacy-handshake # The protocol is being upgraded to version 2. The server is expecting # the legacy handshake from version 1. # # upgrade-v2-finish # The upgrade to version 2 of the protocol is imminent. # # shutdown # The server is shutting down, possibly in reaction to a client event. # # And here are their transitions: # # protov1-serving -> shutdown # When server receives an empty request or encounters another # error. # # protov1-serving -> upgrade-initial # An upgrade request line was seen. # # upgrade-initial -> upgrade-v2-filter-legacy-handshake # Upgrade to version 2 in progress. Server is expecting to # process a legacy handshake. # # upgrade-v2-filter-legacy-handshake -> shutdown # Client did not fulfill upgrade handshake requirements. # # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish # Client fulfilled version 2 upgrade requirements. Finishing that # upgrade. # # upgrade-v2-finish -> protov2-serving # Protocol upgrade to version 2 complete. Server can now speak protocol # version 2. # # protov2-serving -> protov1-serving # Ths happens by default since protocol version 2 is the same as # version 1 except for the handshake. state = b'protov1-serving' proto = sshv1protocolhandler(ui, fin, fout) protoswitched = False while not ev.is_set(): if state == b'protov1-serving': # Commands are issued on new lines. request = fin.readline()[:-1] # Empty lines signal to terminate the connection. if not request: state = b'shutdown' continue # It looks like a protocol upgrade request. Transition state to # handle it. if request.startswith(b'upgrade '): if protoswitched: _sshv1respondooberror( fout, ui.ferr, b'cannot upgrade protocols multiple times', ) state = b'shutdown' continue state = b'upgrade-initial' continue available = wireprotov1server.commands.commandavailable( request, proto ) # This command isn't available. Send an empty response and go # back to waiting for a new command. if not available: _sshv1respondbytes(fout, b'') continue rsp = wireprotov1server.dispatch(repo, proto, request) repo.ui.fout.flush() repo.ui.ferr.flush() if isinstance(rsp, bytes): _sshv1respondbytes(fout, rsp) elif isinstance(rsp, wireprototypes.bytesresponse): _sshv1respondbytes(fout, rsp.data) elif isinstance(rsp, wireprototypes.streamres): _sshv1respondstream(fout, rsp) elif isinstance(rsp, wireprototypes.streamreslegacy): _sshv1respondstream(fout, rsp) elif isinstance(rsp, wireprototypes.pushres): _sshv1respondbytes(fout, b'') _sshv1respondbytes(fout, b'%d' % rsp.res) elif isinstance(rsp, wireprototypes.pusherr): _sshv1respondbytes(fout, rsp.res) elif isinstance(rsp, wireprototypes.ooberror): _sshv1respondooberror(fout, ui.ferr, rsp.message) else: raise error.ProgrammingError( b'unhandled response type from ' b'wire protocol command: %s' % rsp ) # For now, protocol version 2 serving just goes back to version 1. elif state == b'protov2-serving': state = b'protov1-serving' continue elif state == b'upgrade-initial': # We should never transition into this state if we've switched # protocols. assert not protoswitched assert proto.name == wireprototypes.SSHV1 # Expected: upgrade <token> <capabilities> # If we get something else, the request is malformed. It could be # from a future client that has altered the upgrade line content. # We treat this as an unknown command. try: token, caps = request.split(b' ')[1:] except ValueError: _sshv1respondbytes(fout, b'') state = b'protov1-serving' continue # Send empty response if we don't support upgrading protocols. if not ui.configbool(b'experimental', b'sshserver.support-v2'): _sshv1respondbytes(fout, b'') state = b'protov1-serving' continue try: caps = urlreq.parseqs(caps) except ValueError: _sshv1respondbytes(fout, b'') state = b'protov1-serving' continue # We don't see an upgrade request to protocol version 2. Ignore # the upgrade request. wantedprotos = caps.get(b'proto', [b''])[0] if SSHV2 not in wantedprotos: _sshv1respondbytes(fout, b'') state = b'protov1-serving' continue # It looks like we can honor this upgrade request to protocol 2. # Filter the rest of the handshake protocol request lines. state = b'upgrade-v2-filter-legacy-handshake' continue elif state == b'upgrade-v2-filter-legacy-handshake': # Client should have sent legacy handshake after an ``upgrade`` # request. Expected lines: # # hello # between # pairs 81 # 0000...-0000... ok = True for line in (b'hello', b'between', b'pairs 81'): request = fin.readline()[:-1] if request != line: _sshv1respondooberror( fout, ui.ferr, b'malformed handshake protocol: missing %s' % line, ) ok = False state = b'shutdown' break if not ok: continue request = fin.read(81) if request != b'%s-%s' % (b'0' * 40, b'0' * 40): _sshv1respondooberror( fout, ui.ferr, b'malformed handshake protocol: ' b'missing between argument value', ) state = b'shutdown' continue state = b'upgrade-v2-finish' continue elif state == b'upgrade-v2-finish': # Send the upgrade response. fout.write(b'upgraded %s %s\n' % (token, SSHV2)) servercaps = wireprotov1server.capabilities(repo, proto) rsp = b'capabilities: %s' % servercaps.data fout.write(b'%d\n%s\n' % (len(rsp), rsp)) fout.flush() proto = sshv2protocolhandler(ui, fin, fout) protoswitched = True state = b'protov2-serving' continue elif state == b'shutdown': break else: raise error.ProgrammingError( b'unhandled ssh server state: %s' % state ) class sshserver(object): def __init__(self, ui, repo, logfh=None): self._ui = ui self._repo = repo self._fin, self._fout = ui.protectfinout() # Log write I/O to stdout and stderr if configured. if logfh: self._fout = util.makeloggingfileobject( logfh, self._fout, b'o', logdata=True ) ui.ferr = util.makeloggingfileobject( logfh, ui.ferr, b'e', logdata=True ) def serve_forever(self): self.serveuntil(threading.Event()) self._ui.restorefinout(self._fin, self._fout) def serveuntil(self, ev): """Serve until a threading.Event is set.""" _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)