comparison tests/test-wireproto-clientreactor.py @ 43076:2372284d9457

formatting: blacken the codebase This is using my patch to black (https://github.com/psf/black/pull/826) so we don't un-wrap collection literals. Done with: hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S # skip-blame mass-reformatting only # no-check-commit reformats foo_bar functions Differential Revision: https://phab.mercurial-scm.org/D6971
author Augie Fackler <augie@google.com>
date Sun, 06 Oct 2019 09:45:02 -0400
parents 685cf59a134f
children 89a2afe31e82
comparison
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
7 from mercurial import ( 7 from mercurial import (
8 error, 8 error,
9 ui as uimod, 9 ui as uimod,
10 wireprotoframing as framing, 10 wireprotoframing as framing,
11 ) 11 )
12 from mercurial.utils import ( 12 from mercurial.utils import cborutil
13 cborutil,
14 )
15 13
16 try: 14 try:
17 from mercurial import zstd 15 from mercurial import zstd
16
18 zstd.__version__ 17 zstd.__version__
19 except ImportError: 18 except ImportError:
20 zstd = None 19 zstd = None
21 20
22 ffs = framing.makeframefromhumanstring 21 ffs = framing.makeframefromhumanstring
23 22
24 globalui = uimod.ui() 23 globalui = uimod.ui()
25 24
25
26 def sendframe(reactor, frame): 26 def sendframe(reactor, frame):
27 """Send a frame bytearray to a reactor.""" 27 """Send a frame bytearray to a reactor."""
28 header = framing.parseheader(frame) 28 header = framing.parseheader(frame)
29 payload = frame[framing.FRAME_HEADER_SIZE:] 29 payload = frame[framing.FRAME_HEADER_SIZE :]
30 assert len(payload) == header.length 30 assert len(payload) == header.length
31 31
32 return reactor.onframerecv(framing.frame(header.requestid, 32 return reactor.onframerecv(
33 header.streamid, 33 framing.frame(
34 header.streamflags, 34 header.requestid,
35 header.typeid, 35 header.streamid,
36 header.flags, 36 header.streamflags,
37 payload)) 37 header.typeid,
38 header.flags,
39 payload,
40 )
41 )
42
38 43
39 class SingleSendTests(unittest.TestCase): 44 class SingleSendTests(unittest.TestCase):
40 """A reactor that can only send once rejects subsequent sends.""" 45 """A reactor that can only send once rejects subsequent sends."""
41 46
42 if not getattr(unittest.TestCase, 'assertRaisesRegex', False): 47 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
43 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks 48 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
44 # the regex version. 49 # the regex version.
45 assertRaisesRegex = (# camelcase-required 50 assertRaisesRegex = ( # camelcase-required
46 unittest.TestCase.assertRaisesRegexp) 51 unittest.TestCase.assertRaisesRegexp
52 )
47 53
48 def testbasic(self): 54 def testbasic(self):
49 reactor = framing.clientreactor(globalui, 55 reactor = framing.clientreactor(
50 hasmultiplesend=False, 56 globalui, hasmultiplesend=False, buffersends=True
51 buffersends=True) 57 )
52 58
53 request, action, meta = reactor.callcommand(b'foo', {}) 59 request, action, meta = reactor.callcommand(b'foo', {})
54 self.assertEqual(request.state, b'pending') 60 self.assertEqual(request.state, b'pending')
55 self.assertEqual(action, b'noop') 61 self.assertEqual(action, b'noop')
56 62
60 for frame in meta[b'framegen']: 66 for frame in meta[b'framegen']:
61 self.assertEqual(request.state, b'sending') 67 self.assertEqual(request.state, b'sending')
62 68
63 self.assertEqual(request.state, b'sent') 69 self.assertEqual(request.state, b'sent')
64 70
65 with self.assertRaisesRegex(error.ProgrammingError, 71 with self.assertRaisesRegex(
66 'cannot issue new commands'): 72 error.ProgrammingError, 'cannot issue new commands'
73 ):
67 reactor.callcommand(b'foo', {}) 74 reactor.callcommand(b'foo', {})
68 75
69 with self.assertRaisesRegex(error.ProgrammingError, 76 with self.assertRaisesRegex(
70 'cannot issue new commands'): 77 error.ProgrammingError, 'cannot issue new commands'
78 ):
71 reactor.callcommand(b'foo', {}) 79 reactor.callcommand(b'foo', {})
80
72 81
73 class NoBufferTests(unittest.TestCase): 82 class NoBufferTests(unittest.TestCase):
74 """A reactor without send buffering sends requests immediately.""" 83 """A reactor without send buffering sends requests immediately."""
84
75 def testbasic(self): 85 def testbasic(self):
76 reactor = framing.clientreactor(globalui, 86 reactor = framing.clientreactor(
77 hasmultiplesend=True, 87 globalui, hasmultiplesend=True, buffersends=False
78 buffersends=False) 88 )
79 89
80 request, action, meta = reactor.callcommand(b'command1', {}) 90 request, action, meta = reactor.callcommand(b'command1', {})
81 self.assertEqual(request.requestid, 1) 91 self.assertEqual(request.requestid, 1)
82 self.assertEqual(action, b'sendframes') 92 self.assertEqual(action, b'sendframes')
83 93
99 for frame in meta[b'framegen']: 109 for frame in meta[b'framegen']:
100 self.assertEqual(request.state, b'sending') 110 self.assertEqual(request.state, b'sending')
101 111
102 self.assertEqual(request.state, b'sent') 112 self.assertEqual(request.state, b'sent')
103 113
114
104 class BadFrameRecvTests(unittest.TestCase): 115 class BadFrameRecvTests(unittest.TestCase):
105 if not getattr(unittest.TestCase, 'assertRaisesRegex', False): 116 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
106 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks 117 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
107 # the regex version. 118 # the regex version.
108 assertRaisesRegex = (# camelcase-required 119 assertRaisesRegex = ( # camelcase-required
109 unittest.TestCase.assertRaisesRegexp) 120 unittest.TestCase.assertRaisesRegexp
121 )
110 122
111 def testoddstream(self): 123 def testoddstream(self):
112 reactor = framing.clientreactor(globalui) 124 reactor = framing.clientreactor(globalui)
113 125
114 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo')) 126 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
115 self.assertEqual(action, b'error') 127 self.assertEqual(action, b'error')
116 self.assertEqual(meta[b'message'], 128 self.assertEqual(
117 b'received frame with odd numbered stream ID: 1') 129 meta[b'message'], b'received frame with odd numbered stream ID: 1'
130 )
118 131
119 def testunknownstream(self): 132 def testunknownstream(self):
120 reactor = framing.clientreactor(globalui) 133 reactor = framing.clientreactor(globalui)
121 134
122 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo')) 135 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
123 self.assertEqual(action, b'error') 136 self.assertEqual(action, b'error')
124 self.assertEqual(meta[b'message'], 137 self.assertEqual(
125 b'received frame on unknown stream without beginning ' 138 meta[b'message'],
126 b'of stream flag set') 139 b'received frame on unknown stream without beginning '
140 b'of stream flag set',
141 )
127 142
128 def testunhandledframetype(self): 143 def testunhandledframetype(self):
129 reactor = framing.clientreactor(globalui, buffersends=False) 144 reactor = framing.clientreactor(globalui, buffersends=False)
130 145
131 request, action, meta = reactor.callcommand(b'foo', {}) 146 request, action, meta = reactor.callcommand(b'foo', {})
132 for frame in meta[b'framegen']: 147 for frame in meta[b'framegen']:
133 pass 148 pass
134 149
135 with self.assertRaisesRegex(error.ProgrammingError, 150 with self.assertRaisesRegex(
136 'unhandled frame type'): 151 error.ProgrammingError, 'unhandled frame type'
152 ):
137 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo')) 153 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
154
138 155
139 class StreamTests(unittest.TestCase): 156 class StreamTests(unittest.TestCase):
140 def testmultipleresponseframes(self): 157 def testmultipleresponseframes(self):
141 reactor = framing.clientreactor(globalui, buffersends=False) 158 reactor = framing.clientreactor(globalui, buffersends=False)
142 159
146 for f in meta[b'framegen']: 163 for f in meta[b'framegen']:
147 pass 164 pass
148 165
149 action, meta = sendframe( 166 action, meta = sendframe(
150 reactor, 167 reactor,
151 ffs(b'%d 0 stream-begin command-response 0 foo' % 168 ffs(
152 request.requestid)) 169 b'%d 0 stream-begin command-response 0 foo' % request.requestid
153 self.assertEqual(action, b'responsedata') 170 ),
154 171 )
155 action, meta = sendframe( 172 self.assertEqual(action, b'responsedata')
156 reactor, 173
157 ffs(b'%d 0 0 command-response eos bar' % request.requestid)) 174 action, meta = sendframe(
158 self.assertEqual(action, b'responsedata') 175 reactor, ffs(b'%d 0 0 command-response eos bar' % request.requestid)
176 )
177 self.assertEqual(action, b'responsedata')
178
159 179
160 class RedirectTests(unittest.TestCase): 180 class RedirectTests(unittest.TestCase):
161 def testredirect(self): 181 def testredirect(self):
162 reactor = framing.clientreactor(globalui, buffersends=False) 182 reactor = framing.clientreactor(globalui, buffersends=False)
163 183
165 b'targets': [b'a', b'b'], 185 b'targets': [b'a', b'b'],
166 b'hashes': [b'sha256'], 186 b'hashes': [b'sha256'],
167 } 187 }
168 188
169 request, action, meta = reactor.callcommand( 189 request, action, meta = reactor.callcommand(
170 b'foo', {}, redirect=redirect) 190 b'foo', {}, redirect=redirect
191 )
171 192
172 self.assertEqual(action, b'sendframes') 193 self.assertEqual(action, b'sendframes')
173 194
174 frames = list(meta[b'framegen']) 195 frames = list(meta[b'framegen'])
175 self.assertEqual(len(frames), 1) 196 self.assertEqual(len(frames), 1)
176 197
177 self.assertEqual(frames[0], 198 self.assertEqual(
178 ffs(b'1 1 stream-begin command-request new ' 199 frames[0],
179 b"cbor:{b'name': b'foo', " 200 ffs(
180 b"b'redirect': {b'targets': [b'a', b'b'], " 201 b'1 1 stream-begin command-request new '
181 b"b'hashes': [b'sha256']}}")) 202 b"cbor:{b'name': b'foo', "
203 b"b'redirect': {b'targets': [b'a', b'b'], "
204 b"b'hashes': [b'sha256']}}"
205 ),
206 )
207
182 208
183 class StreamSettingsTests(unittest.TestCase): 209 class StreamSettingsTests(unittest.TestCase):
184 def testnoflags(self): 210 def testnoflags(self):
185 reactor = framing.clientreactor(globalui, buffersends=False) 211 reactor = framing.clientreactor(globalui, buffersends=False)
186 212
187 request, action, meta = reactor.callcommand(b'foo', {}) 213 request, action, meta = reactor.callcommand(b'foo', {})
188 for f in meta[b'framegen']: 214 for f in meta[b'framegen']:
189 pass 215 pass
190 216
191 action, meta = sendframe(reactor, 217 action, meta = sendframe(
192 ffs(b'1 2 stream-begin stream-settings 0 ')) 218 reactor, ffs(b'1 2 stream-begin stream-settings 0 ')
219 )
193 220
194 self.assertEqual(action, b'error') 221 self.assertEqual(action, b'error')
195 self.assertEqual(meta, { 222 self.assertEqual(
196 b'message': b'stream encoding settings frame must have ' 223 meta,
197 b'continuation or end of stream flag set', 224 {
198 }) 225 b'message': b'stream encoding settings frame must have '
226 b'continuation or end of stream flag set',
227 },
228 )
199 229
200 def testconflictflags(self): 230 def testconflictflags(self):
201 reactor = framing.clientreactor(globalui, buffersends=False) 231 reactor = framing.clientreactor(globalui, buffersends=False)
202 232
203 request, action, meta = reactor.callcommand(b'foo', {}) 233 request, action, meta = reactor.callcommand(b'foo', {})
204 for f in meta[b'framegen']: 234 for f in meta[b'framegen']:
205 pass 235 pass
206 236
207 action, meta = sendframe(reactor, 237 action, meta = sendframe(
208 ffs(b'1 2 stream-begin stream-settings continuation|eos ')) 238 reactor, ffs(b'1 2 stream-begin stream-settings continuation|eos ')
239 )
209 240
210 self.assertEqual(action, b'error') 241 self.assertEqual(action, b'error')
211 self.assertEqual(meta, { 242 self.assertEqual(
212 b'message': b'stream encoding settings frame cannot have both ' 243 meta,
213 b'continuation and end of stream flags set', 244 {
214 }) 245 b'message': b'stream encoding settings frame cannot have both '
246 b'continuation and end of stream flags set',
247 },
248 )
215 249
216 def testemptypayload(self): 250 def testemptypayload(self):
217 reactor = framing.clientreactor(globalui, buffersends=False) 251 reactor = framing.clientreactor(globalui, buffersends=False)
218 252
219 request, action, meta = reactor.callcommand(b'foo', {}) 253 request, action, meta = reactor.callcommand(b'foo', {})
220 for f in meta[b'framegen']: 254 for f in meta[b'framegen']:
221 pass 255 pass
222 256
223 action, meta = sendframe(reactor, 257 action, meta = sendframe(
224 ffs(b'1 2 stream-begin stream-settings eos ')) 258 reactor, ffs(b'1 2 stream-begin stream-settings eos ')
259 )
225 260
226 self.assertEqual(action, b'error') 261 self.assertEqual(action, b'error')
227 self.assertEqual(meta, { 262 self.assertEqual(
228 b'message': b'stream encoding settings frame did not contain ' 263 meta,
229 b'CBOR data' 264 {
230 }) 265 b'message': b'stream encoding settings frame did not contain '
266 b'CBOR data'
267 },
268 )
231 269
232 def testbadcbor(self): 270 def testbadcbor(self):
233 reactor = framing.clientreactor(globalui, buffersends=False) 271 reactor = framing.clientreactor(globalui, buffersends=False)
234 272
235 request, action, meta = reactor.callcommand(b'foo', {}) 273 request, action, meta = reactor.callcommand(b'foo', {})
236 for f in meta[b'framegen']: 274 for f in meta[b'framegen']:
237 pass 275 pass
238 276
239 action, meta = sendframe(reactor, 277 action, meta = sendframe(
240 ffs(b'1 2 stream-begin stream-settings eos badvalue')) 278 reactor, ffs(b'1 2 stream-begin stream-settings eos badvalue')
279 )
241 280
242 self.assertEqual(action, b'error') 281 self.assertEqual(action, b'error')
243 282
244 def testsingleobject(self): 283 def testsingleobject(self):
245 reactor = framing.clientreactor(globalui, buffersends=False) 284 reactor = framing.clientreactor(globalui, buffersends=False)
246 285
247 request, action, meta = reactor.callcommand(b'foo', {}) 286 request, action, meta = reactor.callcommand(b'foo', {})
248 for f in meta[b'framegen']: 287 for f in meta[b'framegen']:
249 pass 288 pass
250 289
251 action, meta = sendframe(reactor, 290 action, meta = sendframe(
252 ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"')) 291 reactor,
292 ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'),
293 )
253 294
254 self.assertEqual(action, b'noop') 295 self.assertEqual(action, b'noop')
255 self.assertEqual(meta, {}) 296 self.assertEqual(meta, {})
256 297
257 def testmultipleobjects(self): 298 def testmultipleobjects(self):
259 300
260 request, action, meta = reactor.callcommand(b'foo', {}) 301 request, action, meta = reactor.callcommand(b'foo', {})
261 for f in meta[b'framegen']: 302 for f in meta[b'framegen']:
262 pass 303 pass
263 304
264 data = b''.join([ 305 data = b''.join(
265 b''.join(cborutil.streamencode(b'identity')), 306 [
266 b''.join(cborutil.streamencode({b'foo', b'bar'})), 307 b''.join(cborutil.streamencode(b'identity')),
267 ]) 308 b''.join(cborutil.streamencode({b'foo', b'bar'})),
268 309 ]
269 action, meta = sendframe(reactor, 310 )
270 ffs(b'1 2 stream-begin stream-settings eos %s' % data)) 311
312 action, meta = sendframe(
313 reactor, ffs(b'1 2 stream-begin stream-settings eos %s' % data)
314 )
271 315
272 self.assertEqual(action, b'error') 316 self.assertEqual(action, b'error')
273 self.assertEqual(meta, { 317 self.assertEqual(
274 b'message': b'error setting stream decoder: identity decoder ' 318 meta,
275 b'received unexpected additional values', 319 {
276 }) 320 b'message': b'error setting stream decoder: identity decoder '
321 b'received unexpected additional values',
322 },
323 )
277 324
278 def testmultipleframes(self): 325 def testmultipleframes(self):
279 reactor = framing.clientreactor(globalui, buffersends=False) 326 reactor = framing.clientreactor(globalui, buffersends=False)
280 327
281 request, action, meta = reactor.callcommand(b'foo', {}) 328 request, action, meta = reactor.callcommand(b'foo', {})
282 for f in meta[b'framegen']: 329 for f in meta[b'framegen']:
283 pass 330 pass
284 331
285 data = b''.join(cborutil.streamencode(b'identity')) 332 data = b''.join(cborutil.streamencode(b'identity'))
286 333
287 action, meta = sendframe(reactor, 334 action, meta = sendframe(
288 ffs(b'1 2 stream-begin stream-settings continuation %s' % 335 reactor,
289 data[0:3])) 336 ffs(
290 337 b'1 2 stream-begin stream-settings continuation %s' % data[0:3]
291 self.assertEqual(action, b'noop') 338 ),
292 self.assertEqual(meta, {}) 339 )
293 340
294 action, meta = sendframe(reactor, 341 self.assertEqual(action, b'noop')
295 ffs(b'1 2 0 stream-settings eos %s' % data[3:])) 342 self.assertEqual(meta, {})
343
344 action, meta = sendframe(
345 reactor, ffs(b'1 2 0 stream-settings eos %s' % data[3:])
346 )
296 347
297 self.assertEqual(action, b'noop') 348 self.assertEqual(action, b'noop')
298 self.assertEqual(meta, {}) 349 self.assertEqual(meta, {})
299 350
300 def testinvalidencoder(self): 351 def testinvalidencoder(self):
302 353
303 request, action, meta = reactor.callcommand(b'foo', {}) 354 request, action, meta = reactor.callcommand(b'foo', {})
304 for f in meta[b'framegen']: 355 for f in meta[b'framegen']:
305 pass 356 pass
306 357
307 action, meta = sendframe(reactor, 358 action, meta = sendframe(
308 ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"')) 359 reactor,
360 ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'),
361 )
309 362
310 self.assertEqual(action, b'error') 363 self.assertEqual(action, b'error')
311 self.assertEqual(meta, { 364 self.assertEqual(
312 b'message': b'error setting stream decoder: unknown stream ' 365 meta,
313 b'decoder: badvalue', 366 {
314 }) 367 b'message': b'error setting stream decoder: unknown stream '
368 b'decoder: badvalue',
369 },
370 )
315 371
316 def testzlibencoding(self): 372 def testzlibencoding(self):
317 reactor = framing.clientreactor(globalui, buffersends=False) 373 reactor = framing.clientreactor(globalui, buffersends=False)
318 374
319 request, action, meta = reactor.callcommand(b'foo', {}) 375 request, action, meta = reactor.callcommand(b'foo', {})
320 for f in meta[b'framegen']: 376 for f in meta[b'framegen']:
321 pass 377 pass
322 378
323 action, meta = sendframe(reactor, 379 action, meta = sendframe(
324 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % 380 reactor,
325 request.requestid)) 381 ffs(
382 b'%d 2 stream-begin stream-settings eos cbor:b"zlib"'
383 % request.requestid
384 ),
385 )
326 386
327 self.assertEqual(action, b'noop') 387 self.assertEqual(action, b'noop')
328 self.assertEqual(meta, {}) 388 self.assertEqual(meta, {})
329 389
330 result = { 390 result = {
333 encoded = b''.join(cborutil.streamencode(result)) 393 encoded = b''.join(cborutil.streamencode(result))
334 394
335 compressed = zlib.compress(encoded) 395 compressed = zlib.compress(encoded)
336 self.assertEqual(zlib.decompress(compressed), encoded) 396 self.assertEqual(zlib.decompress(compressed), encoded)
337 397
338 action, meta = sendframe(reactor, 398 action, meta = sendframe(
339 ffs(b'%d 2 encoded command-response eos %s' % 399 reactor,
340 (request.requestid, compressed))) 400 ffs(
401 b'%d 2 encoded command-response eos %s'
402 % (request.requestid, compressed)
403 ),
404 )
341 405
342 self.assertEqual(action, b'responsedata') 406 self.assertEqual(action, b'responsedata')
343 self.assertEqual(meta[b'data'], encoded) 407 self.assertEqual(meta[b'data'], encoded)
344 408
345 def testzlibencodingsinglebyteframes(self): 409 def testzlibencodingsinglebyteframes(self):
347 411
348 request, action, meta = reactor.callcommand(b'foo', {}) 412 request, action, meta = reactor.callcommand(b'foo', {})
349 for f in meta[b'framegen']: 413 for f in meta[b'framegen']:
350 pass 414 pass
351 415
352 action, meta = sendframe(reactor, 416 action, meta = sendframe(
353 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % 417 reactor,
354 request.requestid)) 418 ffs(
419 b'%d 2 stream-begin stream-settings eos cbor:b"zlib"'
420 % request.requestid
421 ),
422 )
355 423
356 self.assertEqual(action, b'noop') 424 self.assertEqual(action, b'noop')
357 self.assertEqual(meta, {}) 425 self.assertEqual(meta, {})
358 426
359 result = { 427 result = {
365 self.assertEqual(zlib.decompress(compressed), encoded) 433 self.assertEqual(zlib.decompress(compressed), encoded)
366 434
367 chunks = [] 435 chunks = []
368 436
369 for i in range(len(compressed)): 437 for i in range(len(compressed)):
370 char = compressed[i:i + 1] 438 char = compressed[i : i + 1]
371 if char == b'\\': 439 if char == b'\\':
372 char = b'\\\\' 440 char = b'\\\\'
373 action, meta = sendframe(reactor, 441 action, meta = sendframe(
374 ffs(b'%d 2 encoded command-response continuation %s' % 442 reactor,
375 (request.requestid, char))) 443 ffs(
444 b'%d 2 encoded command-response continuation %s'
445 % (request.requestid, char)
446 ),
447 )
376 448
377 self.assertEqual(action, b'responsedata') 449 self.assertEqual(action, b'responsedata')
378 chunks.append(meta[b'data']) 450 chunks.append(meta[b'data'])
379 self.assertTrue(meta[b'expectmore']) 451 self.assertTrue(meta[b'expectmore'])
380 self.assertFalse(meta[b'eos']) 452 self.assertFalse(meta[b'eos'])
382 # zlib will have the full data decoded at this point, even though 454 # zlib will have the full data decoded at this point, even though
383 # we haven't flushed. 455 # we haven't flushed.
384 self.assertEqual(b''.join(chunks), encoded) 456 self.assertEqual(b''.join(chunks), encoded)
385 457
386 # End the stream for good measure. 458 # End the stream for good measure.
387 action, meta = sendframe(reactor, 459 action, meta = sendframe(
388 ffs(b'%d 2 stream-end command-response eos ' % request.requestid)) 460 reactor,
461 ffs(b'%d 2 stream-end command-response eos ' % request.requestid),
462 )
389 463
390 self.assertEqual(action, b'responsedata') 464 self.assertEqual(action, b'responsedata')
391 self.assertEqual(meta[b'data'], b'') 465 self.assertEqual(meta[b'data'], b'')
392 self.assertFalse(meta[b'expectmore']) 466 self.assertFalse(meta[b'expectmore'])
393 self.assertTrue(meta[b'eos']) 467 self.assertTrue(meta[b'eos'])
394 468
395 def testzlibmultipleresponses(self): 469 def testzlibmultipleresponses(self):
396 # We feed in zlib compressed data on the same stream but belonging to 470 # We feed in zlib compressed data on the same stream but belonging to
397 # 2 different requests. This tests our flushing behavior. 471 # 2 different requests. This tests our flushing behavior.
398 reactor = framing.clientreactor(globalui, buffersends=False, 472 reactor = framing.clientreactor(
399 hasmultiplesend=True) 473 globalui, buffersends=False, hasmultiplesend=True
474 )
400 475
401 request1, action, meta = reactor.callcommand(b'foo', {}) 476 request1, action, meta = reactor.callcommand(b'foo', {})
402 for f in meta[b'framegen']: 477 for f in meta[b'framegen']:
403 pass 478 pass
404 479
407 pass 482 pass
408 483
409 outstream = framing.outputstream(2) 484 outstream = framing.outputstream(2)
410 outstream.setencoder(globalui, b'zlib') 485 outstream.setencoder(globalui, b'zlib')
411 486
412 response1 = b''.join(cborutil.streamencode({ 487 response1 = b''.join(
413 b'status': b'ok', 488 cborutil.streamencode(
414 b'extra': b'response1' * 10, 489 {b'status': b'ok', b'extra': b'response1' * 10,}
415 })) 490 )
416 491 )
417 response2 = b''.join(cborutil.streamencode({ 492
418 b'status': b'error', 493 response2 = b''.join(
419 b'extra': b'response2' * 10, 494 cborutil.streamencode(
420 })) 495 {b'status': b'error', b'extra': b'response2' * 10,}
421 496 )
422 action, meta = sendframe(reactor, 497 )
423 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % 498
424 request1.requestid)) 499 action, meta = sendframe(
500 reactor,
501 ffs(
502 b'%d 2 stream-begin stream-settings eos cbor:b"zlib"'
503 % request1.requestid
504 ),
505 )
425 506
426 self.assertEqual(action, b'noop') 507 self.assertEqual(action, b'noop')
427 self.assertEqual(meta, {}) 508 self.assertEqual(meta, {})
428 509
429 # Feeding partial data in won't get anything useful out. 510 # Feeding partial data in won't get anything useful out.
430 action, meta = sendframe(reactor, 511 action, meta = sendframe(
431 ffs(b'%d 2 encoded command-response continuation %s' % ( 512 reactor,
432 request1.requestid, outstream.encode(response1)))) 513 ffs(
514 b'%d 2 encoded command-response continuation %s'
515 % (request1.requestid, outstream.encode(response1))
516 ),
517 )
433 self.assertEqual(action, b'responsedata') 518 self.assertEqual(action, b'responsedata')
434 self.assertEqual(meta[b'data'], b'') 519 self.assertEqual(meta[b'data'], b'')
435 520
436 # But flushing data at both ends will get our original data. 521 # But flushing data at both ends will get our original data.
437 action, meta = sendframe(reactor, 522 action, meta = sendframe(
438 ffs(b'%d 2 encoded command-response eos %s' % ( 523 reactor,
439 request1.requestid, outstream.flush()))) 524 ffs(
525 b'%d 2 encoded command-response eos %s'
526 % (request1.requestid, outstream.flush())
527 ),
528 )
440 self.assertEqual(action, b'responsedata') 529 self.assertEqual(action, b'responsedata')
441 self.assertEqual(meta[b'data'], response1) 530 self.assertEqual(meta[b'data'], response1)
442 531
443 # We should be able to reuse the compressor/decompressor for the 532 # We should be able to reuse the compressor/decompressor for the
444 # 2nd response. 533 # 2nd response.
445 action, meta = sendframe(reactor, 534 action, meta = sendframe(
446 ffs(b'%d 2 encoded command-response continuation %s' % ( 535 reactor,
447 request2.requestid, outstream.encode(response2)))) 536 ffs(
537 b'%d 2 encoded command-response continuation %s'
538 % (request2.requestid, outstream.encode(response2))
539 ),
540 )
448 self.assertEqual(action, b'responsedata') 541 self.assertEqual(action, b'responsedata')
449 self.assertEqual(meta[b'data'], b'') 542 self.assertEqual(meta[b'data'], b'')
450 543
451 action, meta = sendframe(reactor, 544 action, meta = sendframe(
452 ffs(b'%d 2 encoded command-response eos %s' % ( 545 reactor,
453 request2.requestid, outstream.flush()))) 546 ffs(
547 b'%d 2 encoded command-response eos %s'
548 % (request2.requestid, outstream.flush())
549 ),
550 )
454 self.assertEqual(action, b'responsedata') 551 self.assertEqual(action, b'responsedata')
455 self.assertEqual(meta[b'data'], response2) 552 self.assertEqual(meta[b'data'], response2)
456 553
457 @unittest.skipUnless(zstd, 'zstd not available') 554 @unittest.skipUnless(zstd, 'zstd not available')
458 def testzstd8mbencoding(self): 555 def testzstd8mbencoding(self):
460 557
461 request, action, meta = reactor.callcommand(b'foo', {}) 558 request, action, meta = reactor.callcommand(b'foo', {})
462 for f in meta[b'framegen']: 559 for f in meta[b'framegen']:
463 pass 560 pass
464 561
465 action, meta = sendframe(reactor, 562 action, meta = sendframe(
466 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % 563 reactor,
467 request.requestid)) 564 ffs(
565 b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"'
566 % request.requestid
567 ),
568 )
468 569
469 self.assertEqual(action, b'noop') 570 self.assertEqual(action, b'noop')
470 self.assertEqual(meta, {}) 571 self.assertEqual(meta, {})
471 572
472 result = { 573 result = {
474 } 575 }
475 encoded = b''.join(cborutil.streamencode(result)) 576 encoded = b''.join(cborutil.streamencode(result))
476 577
477 encoder = framing.zstd8mbencoder(globalui) 578 encoder = framing.zstd8mbencoder(globalui)
478 compressed = encoder.encode(encoded) + encoder.finish() 579 compressed = encoder.encode(encoded) + encoder.finish()
479 self.assertEqual(zstd.ZstdDecompressor().decompress( 580 self.assertEqual(
480 compressed, max_output_size=len(encoded)), encoded) 581 zstd.ZstdDecompressor().decompress(
481 582 compressed, max_output_size=len(encoded)
482 action, meta = sendframe(reactor, 583 ),
483 ffs(b'%d 2 encoded command-response eos %s' % 584 encoded,
484 (request.requestid, compressed))) 585 )
586
587 action, meta = sendframe(
588 reactor,
589 ffs(
590 b'%d 2 encoded command-response eos %s'
591 % (request.requestid, compressed)
592 ),
593 )
485 594
486 self.assertEqual(action, b'responsedata') 595 self.assertEqual(action, b'responsedata')
487 self.assertEqual(meta[b'data'], encoded) 596 self.assertEqual(meta[b'data'], encoded)
488 597
489 @unittest.skipUnless(zstd, 'zstd not available') 598 @unittest.skipUnless(zstd, 'zstd not available')
492 601
493 request, action, meta = reactor.callcommand(b'foo', {}) 602 request, action, meta = reactor.callcommand(b'foo', {})
494 for f in meta[b'framegen']: 603 for f in meta[b'framegen']:
495 pass 604 pass
496 605
497 action, meta = sendframe(reactor, 606 action, meta = sendframe(
498 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % 607 reactor,
499 request.requestid)) 608 ffs(
609 b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"'
610 % request.requestid
611 ),
612 )
500 613
501 self.assertEqual(action, b'noop') 614 self.assertEqual(action, b'noop')
502 self.assertEqual(meta, {}) 615 self.assertEqual(meta, {})
503 616
504 result = { 617 result = {
505 b'status': b'ok', 618 b'status': b'ok',
506 } 619 }
507 encoded = b''.join(cborutil.streamencode(result)) 620 encoded = b''.join(cborutil.streamencode(result))
508 621
509 compressed = zstd.ZstdCompressor().compress(encoded) 622 compressed = zstd.ZstdCompressor().compress(encoded)
510 self.assertEqual(zstd.ZstdDecompressor().decompress(compressed), 623 self.assertEqual(
511 encoded) 624 zstd.ZstdDecompressor().decompress(compressed), encoded
625 )
512 626
513 chunks = [] 627 chunks = []
514 628
515 for i in range(len(compressed)): 629 for i in range(len(compressed)):
516 char = compressed[i:i + 1] 630 char = compressed[i : i + 1]
517 if char == b'\\': 631 if char == b'\\':
518 char = b'\\\\' 632 char = b'\\\\'
519 action, meta = sendframe(reactor, 633 action, meta = sendframe(
520 ffs(b'%d 2 encoded command-response continuation %s' % 634 reactor,
521 (request.requestid, char))) 635 ffs(
636 b'%d 2 encoded command-response continuation %s'
637 % (request.requestid, char)
638 ),
639 )
522 640
523 self.assertEqual(action, b'responsedata') 641 self.assertEqual(action, b'responsedata')
524 chunks.append(meta[b'data']) 642 chunks.append(meta[b'data'])
525 self.assertTrue(meta[b'expectmore']) 643 self.assertTrue(meta[b'expectmore'])
526 self.assertFalse(meta[b'eos']) 644 self.assertFalse(meta[b'eos'])
527 645
528 # zstd decompressor will flush at frame boundaries. 646 # zstd decompressor will flush at frame boundaries.
529 self.assertEqual(b''.join(chunks), encoded) 647 self.assertEqual(b''.join(chunks), encoded)
530 648
531 # End the stream for good measure. 649 # End the stream for good measure.
532 action, meta = sendframe(reactor, 650 action, meta = sendframe(
533 ffs(b'%d 2 stream-end command-response eos ' % request.requestid)) 651 reactor,
652 ffs(b'%d 2 stream-end command-response eos ' % request.requestid),
653 )
534 654
535 self.assertEqual(action, b'responsedata') 655 self.assertEqual(action, b'responsedata')
536 self.assertEqual(meta[b'data'], b'') 656 self.assertEqual(meta[b'data'], b'')
537 self.assertFalse(meta[b'expectmore']) 657 self.assertFalse(meta[b'expectmore'])
538 self.assertTrue(meta[b'eos']) 658 self.assertTrue(meta[b'eos'])
539 659
540 @unittest.skipUnless(zstd, 'zstd not available') 660 @unittest.skipUnless(zstd, 'zstd not available')
541 def testzstd8mbmultipleresponses(self): 661 def testzstd8mbmultipleresponses(self):
542 # We feed in zstd compressed data on the same stream but belonging to 662 # We feed in zstd compressed data on the same stream but belonging to
543 # 2 different requests. This tests our flushing behavior. 663 # 2 different requests. This tests our flushing behavior.
544 reactor = framing.clientreactor(globalui, buffersends=False, 664 reactor = framing.clientreactor(
545 hasmultiplesend=True) 665 globalui, buffersends=False, hasmultiplesend=True
666 )
546 667
547 request1, action, meta = reactor.callcommand(b'foo', {}) 668 request1, action, meta = reactor.callcommand(b'foo', {})
548 for f in meta[b'framegen']: 669 for f in meta[b'framegen']:
549 pass 670 pass
550 671
553 pass 674 pass
554 675
555 outstream = framing.outputstream(2) 676 outstream = framing.outputstream(2)
556 outstream.setencoder(globalui, b'zstd-8mb') 677 outstream.setencoder(globalui, b'zstd-8mb')
557 678
558 response1 = b''.join(cborutil.streamencode({ 679 response1 = b''.join(
559 b'status': b'ok', 680 cborutil.streamencode(
560 b'extra': b'response1' * 10, 681 {b'status': b'ok', b'extra': b'response1' * 10,}
561 })) 682 )
562 683 )
563 response2 = b''.join(cborutil.streamencode({ 684
564 b'status': b'error', 685 response2 = b''.join(
565 b'extra': b'response2' * 10, 686 cborutil.streamencode(
566 })) 687 {b'status': b'error', b'extra': b'response2' * 10,}
567 688 )
568 action, meta = sendframe(reactor, 689 )
569 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % 690
570 request1.requestid)) 691 action, meta = sendframe(
692 reactor,
693 ffs(
694 b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"'
695 % request1.requestid
696 ),
697 )
571 698
572 self.assertEqual(action, b'noop') 699 self.assertEqual(action, b'noop')
573 self.assertEqual(meta, {}) 700 self.assertEqual(meta, {})
574 701
575 # Feeding partial data in won't get anything useful out. 702 # Feeding partial data in won't get anything useful out.
576 action, meta = sendframe(reactor, 703 action, meta = sendframe(
577 ffs(b'%d 2 encoded command-response continuation %s' % ( 704 reactor,
578 request1.requestid, outstream.encode(response1)))) 705 ffs(
706 b'%d 2 encoded command-response continuation %s'
707 % (request1.requestid, outstream.encode(response1))
708 ),
709 )
579 self.assertEqual(action, b'responsedata') 710 self.assertEqual(action, b'responsedata')
580 self.assertEqual(meta[b'data'], b'') 711 self.assertEqual(meta[b'data'], b'')
581 712
582 # But flushing data at both ends will get our original data. 713 # But flushing data at both ends will get our original data.
583 action, meta = sendframe(reactor, 714 action, meta = sendframe(
584 ffs(b'%d 2 encoded command-response eos %s' % ( 715 reactor,
585 request1.requestid, outstream.flush()))) 716 ffs(
717 b'%d 2 encoded command-response eos %s'
718 % (request1.requestid, outstream.flush())
719 ),
720 )
586 self.assertEqual(action, b'responsedata') 721 self.assertEqual(action, b'responsedata')
587 self.assertEqual(meta[b'data'], response1) 722 self.assertEqual(meta[b'data'], response1)
588 723
589 # We should be able to reuse the compressor/decompressor for the 724 # We should be able to reuse the compressor/decompressor for the
590 # 2nd response. 725 # 2nd response.
591 action, meta = sendframe(reactor, 726 action, meta = sendframe(
592 ffs(b'%d 2 encoded command-response continuation %s' % ( 727 reactor,
593 request2.requestid, outstream.encode(response2)))) 728 ffs(
729 b'%d 2 encoded command-response continuation %s'
730 % (request2.requestid, outstream.encode(response2))
731 ),
732 )
594 self.assertEqual(action, b'responsedata') 733 self.assertEqual(action, b'responsedata')
595 self.assertEqual(meta[b'data'], b'') 734 self.assertEqual(meta[b'data'], b'')
596 735
597 action, meta = sendframe(reactor, 736 action, meta = sendframe(
598 ffs(b'%d 2 encoded command-response eos %s' % ( 737 reactor,
599 request2.requestid, outstream.flush()))) 738 ffs(
739 b'%d 2 encoded command-response eos %s'
740 % (request2.requestid, outstream.flush())
741 ),
742 )
600 self.assertEqual(action, b'responsedata') 743 self.assertEqual(action, b'responsedata')
601 self.assertEqual(meta[b'data'], response2) 744 self.assertEqual(meta[b'data'], response2)
745
602 746
603 if __name__ == '__main__': 747 if __name__ == '__main__':
604 if (3, 6, 0) <= sys.version_info < (3, 6, 4): 748 if (3, 6, 0) <= sys.version_info < (3, 6, 4):
605 # Python 3.6.0 through 3.6.3 inclusive shipped with 749 # Python 3.6.0 through 3.6.3 inclusive shipped with
606 # https://bugs.python.org/issue31825 and we can't run these 750 # https://bugs.python.org/issue31825 and we can't run these
607 # tests on those specific versions of Python. Sigh. 751 # tests on those specific versions of Python. Sigh.
608 sys.exit(80) 752 sys.exit(80)
609 import silenttestrunner 753 import silenttestrunner
754
610 silenttestrunner.main(__name__) 755 silenttestrunner.main(__name__)