Mercurial > hg
comparison mercurial/wireproto.py @ 37612:5e71dea79aae
wireproto: move value encoding functions to wireprototypes (API)
These functions should live in the same place. I plan to separate
client from server code in upcoming commits. wireprototypes is
where we are putting shared code like this.
Differential Revision: https://phab.mercurial-scm.org/D3257
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 11 Apr 2018 10:50:58 -0700 |
parents | 3a2367e6c6f2 |
children | 96d735601ca1 |
comparison
equal
deleted
inserted
replaced
37611:ae8730877371 | 37612:5e71dea79aae |
---|---|
113 # Forward a couple of names from peer to make wireproto interactions | 113 # Forward a couple of names from peer to make wireproto interactions |
114 # slightly more sensible. | 114 # slightly more sensible. |
115 batchable = peer.batchable | 115 batchable = peer.batchable |
116 future = peer.future | 116 future = peer.future |
117 | 117 |
118 # list of nodes encoding / decoding | |
119 | |
120 def decodelist(l, sep=' '): | |
121 if l: | |
122 return [bin(v) for v in l.split(sep)] | |
123 return [] | |
124 | |
125 def encodelist(l, sep=' '): | |
126 try: | |
127 return sep.join(map(hex, l)) | |
128 except TypeError: | |
129 raise | |
130 | |
131 # batched call argument encoding | |
132 | |
133 def escapearg(plain): | |
134 return (plain | |
135 .replace(':', ':c') | |
136 .replace(',', ':o') | |
137 .replace(';', ':s') | |
138 .replace('=', ':e')) | |
139 | |
140 def unescapearg(escaped): | |
141 return (escaped | |
142 .replace(':e', '=') | |
143 .replace(':s', ';') | |
144 .replace(':o', ',') | |
145 .replace(':c', ':')) | |
146 | 118 |
147 def encodebatchcmds(req): | 119 def encodebatchcmds(req): |
148 """Return a ``cmds`` argument value for the ``batch`` command.""" | 120 """Return a ``cmds`` argument value for the ``batch`` command.""" |
121 escapearg = wireprototypes.escapebatcharg | |
122 | |
149 cmds = [] | 123 cmds = [] |
150 for op, argsdict in req: | 124 for op, argsdict in req: |
151 # Old servers didn't properly unescape argument names. So prevent | 125 # Old servers didn't properly unescape argument names. So prevent |
152 # the sending of argument names that may not be decoded properly by | 126 # the sending of argument names that may not be decoded properly by |
153 # servers. | 127 # servers. |
225 def heads(self): | 199 def heads(self): |
226 f = future() | 200 f = future() |
227 yield {}, f | 201 yield {}, f |
228 d = f.value | 202 d = f.value |
229 try: | 203 try: |
230 yield decodelist(d[:-1]) | 204 yield wireprototypes.decodelist(d[:-1]) |
231 except ValueError: | 205 except ValueError: |
232 self._abort(error.ResponseError(_("unexpected response:"), d)) | 206 self._abort(error.ResponseError(_("unexpected response:"), d)) |
233 | 207 |
234 @batchable | 208 @batchable |
235 def known(self, nodes): | 209 def known(self, nodes): |
236 f = future() | 210 f = future() |
237 yield {'nodes': encodelist(nodes)}, f | 211 yield {'nodes': wireprototypes.encodelist(nodes)}, f |
238 d = f.value | 212 d = f.value |
239 try: | 213 try: |
240 yield [bool(int(b)) for b in d] | 214 yield [bool(int(b)) for b in d] |
241 except ValueError: | 215 except ValueError: |
242 self._abort(error.ResponseError(_("unexpected response:"), d)) | 216 self._abort(error.ResponseError(_("unexpected response:"), d)) |
249 try: | 223 try: |
250 branchmap = {} | 224 branchmap = {} |
251 for branchpart in d.splitlines(): | 225 for branchpart in d.splitlines(): |
252 branchname, branchheads = branchpart.split(' ', 1) | 226 branchname, branchheads = branchpart.split(' ', 1) |
253 branchname = encoding.tolocal(urlreq.unquote(branchname)) | 227 branchname = encoding.tolocal(urlreq.unquote(branchname)) |
254 branchheads = decodelist(branchheads) | 228 branchheads = wireprototypes.decodelist(branchheads) |
255 branchmap[branchname] = branchheads | 229 branchmap[branchname] = branchheads |
256 yield branchmap | 230 yield branchmap |
257 except TypeError: | 231 except TypeError: |
258 self._abort(error.ResponseError(_("unexpected response:"), d)) | 232 self._abort(error.ResponseError(_("unexpected response:"), d)) |
259 | 233 |
304 keytype = gboptsmap.get(key) | 278 keytype = gboptsmap.get(key) |
305 if keytype is None: | 279 if keytype is None: |
306 raise error.ProgrammingError( | 280 raise error.ProgrammingError( |
307 'Unexpectedly None keytype for key %s' % key) | 281 'Unexpectedly None keytype for key %s' % key) |
308 elif keytype == 'nodes': | 282 elif keytype == 'nodes': |
309 value = encodelist(value) | 283 value = wireprototypes.encodelist(value) |
310 elif keytype == 'csv': | 284 elif keytype == 'csv': |
311 value = ','.join(value) | 285 value = ','.join(value) |
312 elif keytype == 'scsv': | 286 elif keytype == 'scsv': |
313 value = ','.join(sorted(value)) | 287 value = ','.join(sorted(value)) |
314 elif keytype == 'boolean': | 288 elif keytype == 'boolean': |
336 `url` is the url the client thinks it's pushing to, which is | 310 `url` is the url the client thinks it's pushing to, which is |
337 visible to hooks. | 311 visible to hooks. |
338 ''' | 312 ''' |
339 | 313 |
340 if heads != ['force'] and self.capable('unbundlehash'): | 314 if heads != ['force'] and self.capable('unbundlehash'): |
341 heads = encodelist(['hashed', | 315 heads = wireprototypes.encodelist( |
342 hashlib.sha1(''.join(sorted(heads))).digest()]) | 316 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) |
343 else: | 317 else: |
344 heads = encodelist(heads) | 318 heads = wireprototypes.encodelist(heads) |
345 | 319 |
346 if util.safehasattr(cg, 'deltaheader'): | 320 if util.safehasattr(cg, 'deltaheader'): |
347 # this a bundle10, do the old style call sequence | 321 # this a bundle10, do the old style call sequence |
348 ret, output = self._callpush("unbundle", cg, heads=heads) | 322 ret, output = self._callpush("unbundle", cg, heads=heads) |
349 if ret == "": | 323 if ret == "": |
366 # End of ipeercommands interface. | 340 # End of ipeercommands interface. |
367 | 341 |
368 # Begin of ipeerlegacycommands interface. | 342 # Begin of ipeerlegacycommands interface. |
369 | 343 |
370 def branches(self, nodes): | 344 def branches(self, nodes): |
371 n = encodelist(nodes) | 345 n = wireprototypes.encodelist(nodes) |
372 d = self._call("branches", nodes=n) | 346 d = self._call("branches", nodes=n) |
373 try: | 347 try: |
374 br = [tuple(decodelist(b)) for b in d.splitlines()] | 348 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] |
375 return br | 349 return br |
376 except ValueError: | 350 except ValueError: |
377 self._abort(error.ResponseError(_("unexpected response:"), d)) | 351 self._abort(error.ResponseError(_("unexpected response:"), d)) |
378 | 352 |
379 def between(self, pairs): | 353 def between(self, pairs): |
380 batch = 8 # avoid giant requests | 354 batch = 8 # avoid giant requests |
381 r = [] | 355 r = [] |
382 for i in xrange(0, len(pairs), batch): | 356 for i in xrange(0, len(pairs), batch): |
383 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]]) | 357 n = " ".join([wireprototypes.encodelist(p, '-') |
358 for p in pairs[i:i + batch]]) | |
384 d = self._call("between", pairs=n) | 359 d = self._call("between", pairs=n) |
385 try: | 360 try: |
386 r.extend(l and decodelist(l) or [] for l in d.splitlines()) | 361 r.extend(l and wireprototypes.decodelist(l) or [] |
362 for l in d.splitlines()) | |
387 except ValueError: | 363 except ValueError: |
388 self._abort(error.ResponseError(_("unexpected response:"), d)) | 364 self._abort(error.ResponseError(_("unexpected response:"), d)) |
389 return r | 365 return r |
390 | 366 |
391 def changegroup(self, nodes, kind): | 367 def changegroup(self, nodes, kind): |
392 n = encodelist(nodes) | 368 n = wireprototypes.encodelist(nodes) |
393 f = self._callcompressable("changegroup", roots=n) | 369 f = self._callcompressable("changegroup", roots=n) |
394 return changegroupmod.cg1unpacker(f, 'UN') | 370 return changegroupmod.cg1unpacker(f, 'UN') |
395 | 371 |
396 def changegroupsubset(self, bases, heads, kind): | 372 def changegroupsubset(self, bases, heads, kind): |
397 self.requirecap('changegroupsubset', _('look up remote changes')) | 373 self.requirecap('changegroupsubset', _('look up remote changes')) |
398 bases = encodelist(bases) | 374 bases = wireprototypes.encodelist(bases) |
399 heads = encodelist(heads) | 375 heads = wireprototypes.encodelist(heads) |
400 f = self._callcompressable("changegroupsubset", | 376 f = self._callcompressable("changegroupsubset", |
401 bases=bases, heads=heads) | 377 bases=bases, heads=heads) |
402 return changegroupmod.cg1unpacker(f, 'UN') | 378 return changegroupmod.cg1unpacker(f, 'UN') |
403 | 379 |
404 # End of ipeerlegacycommands interface. | 380 # End of ipeerlegacycommands interface. |
412 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): | 388 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): |
413 ui.debug('devel-peer-request: batched-content\n') | 389 ui.debug('devel-peer-request: batched-content\n') |
414 for op, args in req: | 390 for op, args in req: |
415 msg = 'devel-peer-request: - %s (%d arguments)\n' | 391 msg = 'devel-peer-request: - %s (%d arguments)\n' |
416 ui.debug(msg % (op, len(args))) | 392 ui.debug(msg % (op, len(args))) |
393 | |
394 unescapearg = wireprototypes.unescapebatcharg | |
417 | 395 |
418 rsp = self._callstream("batch", cmds=encodebatchcmds(req)) | 396 rsp = self._callstream("batch", cmds=encodebatchcmds(req)) |
419 chunk = rsp.read(1024) | 397 chunk = rsp.read(1024) |
420 work = [chunk] | 398 work = [chunk] |
421 while chunk: | 399 while chunk: |
791 | 769 |
792 # TODO define a more appropriate permissions type to use for this. | 770 # TODO define a more appropriate permissions type to use for this. |
793 @wireprotocommand('batch', 'cmds *', permission='pull', | 771 @wireprotocommand('batch', 'cmds *', permission='pull', |
794 transportpolicy=POLICY_V1_ONLY) | 772 transportpolicy=POLICY_V1_ONLY) |
795 def batch(repo, proto, cmds, others): | 773 def batch(repo, proto, cmds, others): |
774 unescapearg = wireprototypes.unescapebatcharg | |
796 repo = repo.filtered("served") | 775 repo = repo.filtered("served") |
797 res = [] | 776 res = [] |
798 for pair in cmds.split(';'): | 777 for pair in cmds.split(';'): |
799 op, args = pair.split(' ', 1) | 778 op, args = pair.split(' ', 1) |
800 vals = {} | 779 vals = {} |
830 # For now, all batchable commands must return bytesresponse or | 809 # For now, all batchable commands must return bytesresponse or |
831 # raw bytes (for backwards compatibility). | 810 # raw bytes (for backwards compatibility). |
832 assert isinstance(result, (wireprototypes.bytesresponse, bytes)) | 811 assert isinstance(result, (wireprototypes.bytesresponse, bytes)) |
833 if isinstance(result, wireprototypes.bytesresponse): | 812 if isinstance(result, wireprototypes.bytesresponse): |
834 result = result.data | 813 result = result.data |
835 res.append(escapearg(result)) | 814 res.append(wireprototypes.escapebatcharg(result)) |
836 | 815 |
837 return wireprototypes.bytesresponse(';'.join(res)) | 816 return wireprototypes.bytesresponse(';'.join(res)) |
838 | 817 |
839 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY, | 818 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY, |
840 permission='pull') | 819 permission='pull') |
841 def between(repo, proto, pairs): | 820 def between(repo, proto, pairs): |
842 pairs = [decodelist(p, '-') for p in pairs.split(" ")] | 821 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")] |
843 r = [] | 822 r = [] |
844 for b in repo.between(pairs): | 823 for b in repo.between(pairs): |
845 r.append(encodelist(b) + "\n") | 824 r.append(wireprototypes.encodelist(b) + "\n") |
846 | 825 |
847 return wireprototypes.bytesresponse(''.join(r)) | 826 return wireprototypes.bytesresponse(''.join(r)) |
848 | 827 |
849 @wireprotocommand('branchmap', permission='pull', | 828 @wireprotocommand('branchmap', permission='pull', |
850 transportpolicy=POLICY_V1_ONLY) | 829 transportpolicy=POLICY_V1_ONLY) |
851 def branchmap(repo, proto): | 830 def branchmap(repo, proto): |
852 branchmap = repo.branchmap() | 831 branchmap = repo.branchmap() |
853 heads = [] | 832 heads = [] |
854 for branch, nodes in branchmap.iteritems(): | 833 for branch, nodes in branchmap.iteritems(): |
855 branchname = urlreq.quote(encoding.fromlocal(branch)) | 834 branchname = urlreq.quote(encoding.fromlocal(branch)) |
856 branchnodes = encodelist(nodes) | 835 branchnodes = wireprototypes.encodelist(nodes) |
857 heads.append('%s %s' % (branchname, branchnodes)) | 836 heads.append('%s %s' % (branchname, branchnodes)) |
858 | 837 |
859 return wireprototypes.bytesresponse('\n'.join(heads)) | 838 return wireprototypes.bytesresponse('\n'.join(heads)) |
860 | 839 |
861 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY, | 840 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY, |
862 permission='pull') | 841 permission='pull') |
863 def branches(repo, proto, nodes): | 842 def branches(repo, proto, nodes): |
864 nodes = decodelist(nodes) | 843 nodes = wireprototypes.decodelist(nodes) |
865 r = [] | 844 r = [] |
866 for b in repo.branches(nodes): | 845 for b in repo.branches(nodes): |
867 r.append(encodelist(b) + "\n") | 846 r.append(wireprototypes.encodelist(b) + "\n") |
868 | 847 |
869 return wireprototypes.bytesresponse(''.join(r)) | 848 return wireprototypes.bytesresponse(''.join(r)) |
870 | 849 |
871 @wireprotocommand('clonebundles', '', permission='pull', | 850 @wireprotocommand('clonebundles', '', permission='pull', |
872 transportpolicy=POLICY_V1_ONLY) | 851 transportpolicy=POLICY_V1_ONLY) |
929 return wireprototypes.bytesresponse(' '.join(sorted(caps))) | 908 return wireprototypes.bytesresponse(' '.join(sorted(caps))) |
930 | 909 |
931 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY, | 910 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY, |
932 permission='pull') | 911 permission='pull') |
933 def changegroup(repo, proto, roots): | 912 def changegroup(repo, proto, roots): |
934 nodes = decodelist(roots) | 913 nodes = wireprototypes.decodelist(roots) |
935 outgoing = discovery.outgoing(repo, missingroots=nodes, | 914 outgoing = discovery.outgoing(repo, missingroots=nodes, |
936 missingheads=repo.heads()) | 915 missingheads=repo.heads()) |
937 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') | 916 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') |
938 gen = iter(lambda: cg.read(32768), '') | 917 gen = iter(lambda: cg.read(32768), '') |
939 return wireprototypes.streamres(gen=gen) | 918 return wireprototypes.streamres(gen=gen) |
940 | 919 |
941 @wireprotocommand('changegroupsubset', 'bases heads', | 920 @wireprotocommand('changegroupsubset', 'bases heads', |
942 transportpolicy=POLICY_V1_ONLY, | 921 transportpolicy=POLICY_V1_ONLY, |
943 permission='pull') | 922 permission='pull') |
944 def changegroupsubset(repo, proto, bases, heads): | 923 def changegroupsubset(repo, proto, bases, heads): |
945 bases = decodelist(bases) | 924 bases = wireprototypes.decodelist(bases) |
946 heads = decodelist(heads) | 925 heads = wireprototypes.decodelist(heads) |
947 outgoing = discovery.outgoing(repo, missingroots=bases, | 926 outgoing = discovery.outgoing(repo, missingroots=bases, |
948 missingheads=heads) | 927 missingheads=heads) |
949 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') | 928 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') |
950 gen = iter(lambda: cg.read(32768), '') | 929 gen = iter(lambda: cg.read(32768), '') |
951 return wireprototypes.streamres(gen=gen) | 930 return wireprototypes.streamres(gen=gen) |
1027 def getbundle(repo, proto, others): | 1006 def getbundle(repo, proto, others): |
1028 opts = options('getbundle', gboptsmap.keys(), others) | 1007 opts = options('getbundle', gboptsmap.keys(), others) |
1029 for k, v in opts.iteritems(): | 1008 for k, v in opts.iteritems(): |
1030 keytype = gboptsmap[k] | 1009 keytype = gboptsmap[k] |
1031 if keytype == 'nodes': | 1010 if keytype == 'nodes': |
1032 opts[k] = decodelist(v) | 1011 opts[k] = wireprototypes.decodelist(v) |
1033 elif keytype == 'csv': | 1012 elif keytype == 'csv': |
1034 opts[k] = list(v.split(',')) | 1013 opts[k] = list(v.split(',')) |
1035 elif keytype == 'scsv': | 1014 elif keytype == 'scsv': |
1036 opts[k] = set(v.split(',')) | 1015 opts[k] = set(v.split(',')) |
1037 elif keytype == 'boolean': | 1016 elif keytype == 'boolean': |
1099 gen=chunks, prefer_uncompressed=not prefercompressed) | 1078 gen=chunks, prefer_uncompressed=not prefercompressed) |
1100 | 1079 |
1101 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY) | 1080 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY) |
1102 def heads(repo, proto): | 1081 def heads(repo, proto): |
1103 h = repo.heads() | 1082 h = repo.heads() |
1104 return wireprototypes.bytesresponse(encodelist(h) + '\n') | 1083 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n') |
1105 | 1084 |
1106 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY) | 1085 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY) |
1107 def hello(repo, proto): | 1086 def hello(repo, proto): |
1108 """Called as part of SSH handshake to obtain server info. | 1087 """Called as part of SSH handshake to obtain server info. |
1109 | 1088 |
1138 return wireprototypes.bytesresponse('%d %s\n' % (success, r)) | 1117 return wireprototypes.bytesresponse('%d %s\n' % (success, r)) |
1139 | 1118 |
1140 @wireprotocommand('known', 'nodes *', permission='pull', | 1119 @wireprotocommand('known', 'nodes *', permission='pull', |
1141 transportpolicy=POLICY_V1_ONLY) | 1120 transportpolicy=POLICY_V1_ONLY) |
1142 def known(repo, proto, nodes, others): | 1121 def known(repo, proto, nodes, others): |
1143 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes))) | 1122 v = ''.join(b and '1' or '0' |
1123 for b in repo.known(wireprototypes.decodelist(nodes))) | |
1144 return wireprototypes.bytesresponse(v) | 1124 return wireprototypes.bytesresponse(v) |
1145 | 1125 |
1146 @wireprotocommand('protocaps', 'caps', permission='pull', | 1126 @wireprotocommand('protocaps', 'caps', permission='pull', |
1147 transportpolicy=POLICY_V1_ONLY) | 1127 transportpolicy=POLICY_V1_ONLY) |
1148 def protocaps(repo, proto, caps): | 1128 def protocaps(repo, proto, caps): |
1183 streamclone.generatev1wireproto(repo)) | 1163 streamclone.generatev1wireproto(repo)) |
1184 | 1164 |
1185 @wireprotocommand('unbundle', 'heads', permission='push', | 1165 @wireprotocommand('unbundle', 'heads', permission='push', |
1186 transportpolicy=POLICY_V1_ONLY) | 1166 transportpolicy=POLICY_V1_ONLY) |
1187 def unbundle(repo, proto, heads): | 1167 def unbundle(repo, proto, heads): |
1188 their_heads = decodelist(heads) | 1168 their_heads = wireprototypes.decodelist(heads) |
1189 | 1169 |
1190 with proto.mayberedirectstdio() as output: | 1170 with proto.mayberedirectstdio() as output: |
1191 try: | 1171 try: |
1192 exchange.check_heads(repo, their_heads, 'preparing changes') | 1172 exchange.check_heads(repo, their_heads, 'preparing changes') |
1193 cleanup = lambda: None | 1173 cleanup = lambda: None |