comparison mercurial/wireproto.py @ 37612:5e71dea79aae

wireproto: move value encoding functions to wireprototypes (API) These functions should live in the same place. I plan to separate client from server code in upcoming commits. wireprototypes is where we are putting shared code like this. Differential Revision: https://phab.mercurial-scm.org/D3257
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 11 Apr 2018 10:50:58 -0700
parents 3a2367e6c6f2
children 96d735601ca1
comparison
equal deleted inserted replaced
37611:ae8730877371 37612:5e71dea79aae
113 # Forward a couple of names from peer to make wireproto interactions 113 # Forward a couple of names from peer to make wireproto interactions
114 # slightly more sensible. 114 # slightly more sensible.
115 batchable = peer.batchable 115 batchable = peer.batchable
116 future = peer.future 116 future = peer.future
117 117
118 # list of nodes encoding / decoding
119
120 def decodelist(l, sep=' '):
121 if l:
122 return [bin(v) for v in l.split(sep)]
123 return []
124
125 def encodelist(l, sep=' '):
126 try:
127 return sep.join(map(hex, l))
128 except TypeError:
129 raise
130
131 # batched call argument encoding
132
133 def escapearg(plain):
134 return (plain
135 .replace(':', ':c')
136 .replace(',', ':o')
137 .replace(';', ':s')
138 .replace('=', ':e'))
139
140 def unescapearg(escaped):
141 return (escaped
142 .replace(':e', '=')
143 .replace(':s', ';')
144 .replace(':o', ',')
145 .replace(':c', ':'))
146 118
147 def encodebatchcmds(req): 119 def encodebatchcmds(req):
148 """Return a ``cmds`` argument value for the ``batch`` command.""" 120 """Return a ``cmds`` argument value for the ``batch`` command."""
121 escapearg = wireprototypes.escapebatcharg
122
149 cmds = [] 123 cmds = []
150 for op, argsdict in req: 124 for op, argsdict in req:
151 # Old servers didn't properly unescape argument names. So prevent 125 # Old servers didn't properly unescape argument names. So prevent
152 # the sending of argument names that may not be decoded properly by 126 # the sending of argument names that may not be decoded properly by
153 # servers. 127 # servers.
225 def heads(self): 199 def heads(self):
226 f = future() 200 f = future()
227 yield {}, f 201 yield {}, f
228 d = f.value 202 d = f.value
229 try: 203 try:
230 yield decodelist(d[:-1]) 204 yield wireprototypes.decodelist(d[:-1])
231 except ValueError: 205 except ValueError:
232 self._abort(error.ResponseError(_("unexpected response:"), d)) 206 self._abort(error.ResponseError(_("unexpected response:"), d))
233 207
234 @batchable 208 @batchable
235 def known(self, nodes): 209 def known(self, nodes):
236 f = future() 210 f = future()
237 yield {'nodes': encodelist(nodes)}, f 211 yield {'nodes': wireprototypes.encodelist(nodes)}, f
238 d = f.value 212 d = f.value
239 try: 213 try:
240 yield [bool(int(b)) for b in d] 214 yield [bool(int(b)) for b in d]
241 except ValueError: 215 except ValueError:
242 self._abort(error.ResponseError(_("unexpected response:"), d)) 216 self._abort(error.ResponseError(_("unexpected response:"), d))
249 try: 223 try:
250 branchmap = {} 224 branchmap = {}
251 for branchpart in d.splitlines(): 225 for branchpart in d.splitlines():
252 branchname, branchheads = branchpart.split(' ', 1) 226 branchname, branchheads = branchpart.split(' ', 1)
253 branchname = encoding.tolocal(urlreq.unquote(branchname)) 227 branchname = encoding.tolocal(urlreq.unquote(branchname))
254 branchheads = decodelist(branchheads) 228 branchheads = wireprototypes.decodelist(branchheads)
255 branchmap[branchname] = branchheads 229 branchmap[branchname] = branchheads
256 yield branchmap 230 yield branchmap
257 except TypeError: 231 except TypeError:
258 self._abort(error.ResponseError(_("unexpected response:"), d)) 232 self._abort(error.ResponseError(_("unexpected response:"), d))
259 233
304 keytype = gboptsmap.get(key) 278 keytype = gboptsmap.get(key)
305 if keytype is None: 279 if keytype is None:
306 raise error.ProgrammingError( 280 raise error.ProgrammingError(
307 'Unexpectedly None keytype for key %s' % key) 281 'Unexpectedly None keytype for key %s' % key)
308 elif keytype == 'nodes': 282 elif keytype == 'nodes':
309 value = encodelist(value) 283 value = wireprototypes.encodelist(value)
310 elif keytype == 'csv': 284 elif keytype == 'csv':
311 value = ','.join(value) 285 value = ','.join(value)
312 elif keytype == 'scsv': 286 elif keytype == 'scsv':
313 value = ','.join(sorted(value)) 287 value = ','.join(sorted(value))
314 elif keytype == 'boolean': 288 elif keytype == 'boolean':
336 `url` is the url the client thinks it's pushing to, which is 310 `url` is the url the client thinks it's pushing to, which is
337 visible to hooks. 311 visible to hooks.
338 ''' 312 '''
339 313
340 if heads != ['force'] and self.capable('unbundlehash'): 314 if heads != ['force'] and self.capable('unbundlehash'):
341 heads = encodelist(['hashed', 315 heads = wireprototypes.encodelist(
342 hashlib.sha1(''.join(sorted(heads))).digest()]) 316 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
343 else: 317 else:
344 heads = encodelist(heads) 318 heads = wireprototypes.encodelist(heads)
345 319
346 if util.safehasattr(cg, 'deltaheader'): 320 if util.safehasattr(cg, 'deltaheader'):
347 # this a bundle10, do the old style call sequence 321 # this a bundle10, do the old style call sequence
348 ret, output = self._callpush("unbundle", cg, heads=heads) 322 ret, output = self._callpush("unbundle", cg, heads=heads)
349 if ret == "": 323 if ret == "":
366 # End of ipeercommands interface. 340 # End of ipeercommands interface.
367 341
368 # Begin of ipeerlegacycommands interface. 342 # Begin of ipeerlegacycommands interface.
369 343
370 def branches(self, nodes): 344 def branches(self, nodes):
371 n = encodelist(nodes) 345 n = wireprototypes.encodelist(nodes)
372 d = self._call("branches", nodes=n) 346 d = self._call("branches", nodes=n)
373 try: 347 try:
374 br = [tuple(decodelist(b)) for b in d.splitlines()] 348 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
375 return br 349 return br
376 except ValueError: 350 except ValueError:
377 self._abort(error.ResponseError(_("unexpected response:"), d)) 351 self._abort(error.ResponseError(_("unexpected response:"), d))
378 352
379 def between(self, pairs): 353 def between(self, pairs):
380 batch = 8 # avoid giant requests 354 batch = 8 # avoid giant requests
381 r = [] 355 r = []
382 for i in xrange(0, len(pairs), batch): 356 for i in xrange(0, len(pairs), batch):
383 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]]) 357 n = " ".join([wireprototypes.encodelist(p, '-')
358 for p in pairs[i:i + batch]])
384 d = self._call("between", pairs=n) 359 d = self._call("between", pairs=n)
385 try: 360 try:
386 r.extend(l and decodelist(l) or [] for l in d.splitlines()) 361 r.extend(l and wireprototypes.decodelist(l) or []
362 for l in d.splitlines())
387 except ValueError: 363 except ValueError:
388 self._abort(error.ResponseError(_("unexpected response:"), d)) 364 self._abort(error.ResponseError(_("unexpected response:"), d))
389 return r 365 return r
390 366
391 def changegroup(self, nodes, kind): 367 def changegroup(self, nodes, kind):
392 n = encodelist(nodes) 368 n = wireprototypes.encodelist(nodes)
393 f = self._callcompressable("changegroup", roots=n) 369 f = self._callcompressable("changegroup", roots=n)
394 return changegroupmod.cg1unpacker(f, 'UN') 370 return changegroupmod.cg1unpacker(f, 'UN')
395 371
396 def changegroupsubset(self, bases, heads, kind): 372 def changegroupsubset(self, bases, heads, kind):
397 self.requirecap('changegroupsubset', _('look up remote changes')) 373 self.requirecap('changegroupsubset', _('look up remote changes'))
398 bases = encodelist(bases) 374 bases = wireprototypes.encodelist(bases)
399 heads = encodelist(heads) 375 heads = wireprototypes.encodelist(heads)
400 f = self._callcompressable("changegroupsubset", 376 f = self._callcompressable("changegroupsubset",
401 bases=bases, heads=heads) 377 bases=bases, heads=heads)
402 return changegroupmod.cg1unpacker(f, 'UN') 378 return changegroupmod.cg1unpacker(f, 'UN')
403 379
404 # End of ipeerlegacycommands interface. 380 # End of ipeerlegacycommands interface.
412 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): 388 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
413 ui.debug('devel-peer-request: batched-content\n') 389 ui.debug('devel-peer-request: batched-content\n')
414 for op, args in req: 390 for op, args in req:
415 msg = 'devel-peer-request: - %s (%d arguments)\n' 391 msg = 'devel-peer-request: - %s (%d arguments)\n'
416 ui.debug(msg % (op, len(args))) 392 ui.debug(msg % (op, len(args)))
393
394 unescapearg = wireprototypes.unescapebatcharg
417 395
418 rsp = self._callstream("batch", cmds=encodebatchcmds(req)) 396 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
419 chunk = rsp.read(1024) 397 chunk = rsp.read(1024)
420 work = [chunk] 398 work = [chunk]
421 while chunk: 399 while chunk:
791 769
792 # TODO define a more appropriate permissions type to use for this. 770 # TODO define a more appropriate permissions type to use for this.
793 @wireprotocommand('batch', 'cmds *', permission='pull', 771 @wireprotocommand('batch', 'cmds *', permission='pull',
794 transportpolicy=POLICY_V1_ONLY) 772 transportpolicy=POLICY_V1_ONLY)
795 def batch(repo, proto, cmds, others): 773 def batch(repo, proto, cmds, others):
774 unescapearg = wireprototypes.unescapebatcharg
796 repo = repo.filtered("served") 775 repo = repo.filtered("served")
797 res = [] 776 res = []
798 for pair in cmds.split(';'): 777 for pair in cmds.split(';'):
799 op, args = pair.split(' ', 1) 778 op, args = pair.split(' ', 1)
800 vals = {} 779 vals = {}
830 # For now, all batchable commands must return bytesresponse or 809 # For now, all batchable commands must return bytesresponse or
831 # raw bytes (for backwards compatibility). 810 # raw bytes (for backwards compatibility).
832 assert isinstance(result, (wireprototypes.bytesresponse, bytes)) 811 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
833 if isinstance(result, wireprototypes.bytesresponse): 812 if isinstance(result, wireprototypes.bytesresponse):
834 result = result.data 813 result = result.data
835 res.append(escapearg(result)) 814 res.append(wireprototypes.escapebatcharg(result))
836 815
837 return wireprototypes.bytesresponse(';'.join(res)) 816 return wireprototypes.bytesresponse(';'.join(res))
838 817
839 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY, 818 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
840 permission='pull') 819 permission='pull')
841 def between(repo, proto, pairs): 820 def between(repo, proto, pairs):
842 pairs = [decodelist(p, '-') for p in pairs.split(" ")] 821 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
843 r = [] 822 r = []
844 for b in repo.between(pairs): 823 for b in repo.between(pairs):
845 r.append(encodelist(b) + "\n") 824 r.append(wireprototypes.encodelist(b) + "\n")
846 825
847 return wireprototypes.bytesresponse(''.join(r)) 826 return wireprototypes.bytesresponse(''.join(r))
848 827
849 @wireprotocommand('branchmap', permission='pull', 828 @wireprotocommand('branchmap', permission='pull',
850 transportpolicy=POLICY_V1_ONLY) 829 transportpolicy=POLICY_V1_ONLY)
851 def branchmap(repo, proto): 830 def branchmap(repo, proto):
852 branchmap = repo.branchmap() 831 branchmap = repo.branchmap()
853 heads = [] 832 heads = []
854 for branch, nodes in branchmap.iteritems(): 833 for branch, nodes in branchmap.iteritems():
855 branchname = urlreq.quote(encoding.fromlocal(branch)) 834 branchname = urlreq.quote(encoding.fromlocal(branch))
856 branchnodes = encodelist(nodes) 835 branchnodes = wireprototypes.encodelist(nodes)
857 heads.append('%s %s' % (branchname, branchnodes)) 836 heads.append('%s %s' % (branchname, branchnodes))
858 837
859 return wireprototypes.bytesresponse('\n'.join(heads)) 838 return wireprototypes.bytesresponse('\n'.join(heads))
860 839
861 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY, 840 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
862 permission='pull') 841 permission='pull')
863 def branches(repo, proto, nodes): 842 def branches(repo, proto, nodes):
864 nodes = decodelist(nodes) 843 nodes = wireprototypes.decodelist(nodes)
865 r = [] 844 r = []
866 for b in repo.branches(nodes): 845 for b in repo.branches(nodes):
867 r.append(encodelist(b) + "\n") 846 r.append(wireprototypes.encodelist(b) + "\n")
868 847
869 return wireprototypes.bytesresponse(''.join(r)) 848 return wireprototypes.bytesresponse(''.join(r))
870 849
871 @wireprotocommand('clonebundles', '', permission='pull', 850 @wireprotocommand('clonebundles', '', permission='pull',
872 transportpolicy=POLICY_V1_ONLY) 851 transportpolicy=POLICY_V1_ONLY)
929 return wireprototypes.bytesresponse(' '.join(sorted(caps))) 908 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
930 909
931 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY, 910 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
932 permission='pull') 911 permission='pull')
933 def changegroup(repo, proto, roots): 912 def changegroup(repo, proto, roots):
934 nodes = decodelist(roots) 913 nodes = wireprototypes.decodelist(roots)
935 outgoing = discovery.outgoing(repo, missingroots=nodes, 914 outgoing = discovery.outgoing(repo, missingroots=nodes,
936 missingheads=repo.heads()) 915 missingheads=repo.heads())
937 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') 916 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
938 gen = iter(lambda: cg.read(32768), '') 917 gen = iter(lambda: cg.read(32768), '')
939 return wireprototypes.streamres(gen=gen) 918 return wireprototypes.streamres(gen=gen)
940 919
941 @wireprotocommand('changegroupsubset', 'bases heads', 920 @wireprotocommand('changegroupsubset', 'bases heads',
942 transportpolicy=POLICY_V1_ONLY, 921 transportpolicy=POLICY_V1_ONLY,
943 permission='pull') 922 permission='pull')
944 def changegroupsubset(repo, proto, bases, heads): 923 def changegroupsubset(repo, proto, bases, heads):
945 bases = decodelist(bases) 924 bases = wireprototypes.decodelist(bases)
946 heads = decodelist(heads) 925 heads = wireprototypes.decodelist(heads)
947 outgoing = discovery.outgoing(repo, missingroots=bases, 926 outgoing = discovery.outgoing(repo, missingroots=bases,
948 missingheads=heads) 927 missingheads=heads)
949 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve') 928 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
950 gen = iter(lambda: cg.read(32768), '') 929 gen = iter(lambda: cg.read(32768), '')
951 return wireprototypes.streamres(gen=gen) 930 return wireprototypes.streamres(gen=gen)
1027 def getbundle(repo, proto, others): 1006 def getbundle(repo, proto, others):
1028 opts = options('getbundle', gboptsmap.keys(), others) 1007 opts = options('getbundle', gboptsmap.keys(), others)
1029 for k, v in opts.iteritems(): 1008 for k, v in opts.iteritems():
1030 keytype = gboptsmap[k] 1009 keytype = gboptsmap[k]
1031 if keytype == 'nodes': 1010 if keytype == 'nodes':
1032 opts[k] = decodelist(v) 1011 opts[k] = wireprototypes.decodelist(v)
1033 elif keytype == 'csv': 1012 elif keytype == 'csv':
1034 opts[k] = list(v.split(',')) 1013 opts[k] = list(v.split(','))
1035 elif keytype == 'scsv': 1014 elif keytype == 'scsv':
1036 opts[k] = set(v.split(',')) 1015 opts[k] = set(v.split(','))
1037 elif keytype == 'boolean': 1016 elif keytype == 'boolean':
1099 gen=chunks, prefer_uncompressed=not prefercompressed) 1078 gen=chunks, prefer_uncompressed=not prefercompressed)
1100 1079
1101 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY) 1080 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY)
1102 def heads(repo, proto): 1081 def heads(repo, proto):
1103 h = repo.heads() 1082 h = repo.heads()
1104 return wireprototypes.bytesresponse(encodelist(h) + '\n') 1083 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
1105 1084
1106 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY) 1085 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY)
1107 def hello(repo, proto): 1086 def hello(repo, proto):
1108 """Called as part of SSH handshake to obtain server info. 1087 """Called as part of SSH handshake to obtain server info.
1109 1088
1138 return wireprototypes.bytesresponse('%d %s\n' % (success, r)) 1117 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1139 1118
1140 @wireprotocommand('known', 'nodes *', permission='pull', 1119 @wireprotocommand('known', 'nodes *', permission='pull',
1141 transportpolicy=POLICY_V1_ONLY) 1120 transportpolicy=POLICY_V1_ONLY)
1142 def known(repo, proto, nodes, others): 1121 def known(repo, proto, nodes, others):
1143 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes))) 1122 v = ''.join(b and '1' or '0'
1123 for b in repo.known(wireprototypes.decodelist(nodes)))
1144 return wireprototypes.bytesresponse(v) 1124 return wireprototypes.bytesresponse(v)
1145 1125
1146 @wireprotocommand('protocaps', 'caps', permission='pull', 1126 @wireprotocommand('protocaps', 'caps', permission='pull',
1147 transportpolicy=POLICY_V1_ONLY) 1127 transportpolicy=POLICY_V1_ONLY)
1148 def protocaps(repo, proto, caps): 1128 def protocaps(repo, proto, caps):
1183 streamclone.generatev1wireproto(repo)) 1163 streamclone.generatev1wireproto(repo))
1184 1164
1185 @wireprotocommand('unbundle', 'heads', permission='push', 1165 @wireprotocommand('unbundle', 'heads', permission='push',
1186 transportpolicy=POLICY_V1_ONLY) 1166 transportpolicy=POLICY_V1_ONLY)
1187 def unbundle(repo, proto, heads): 1167 def unbundle(repo, proto, heads):
1188 their_heads = decodelist(heads) 1168 their_heads = wireprototypes.decodelist(heads)
1189 1169
1190 with proto.mayberedirectstdio() as output: 1170 with proto.mayberedirectstdio() as output:
1191 try: 1171 try:
1192 exchange.check_heads(repo, their_heads, 'preparing changes') 1172 exchange.check_heads(repo, their_heads, 'preparing changes')
1193 cleanup = lambda: None 1173 cleanup = lambda: None