tests/test-wireproto-serverreactor.py
changeset 37542 1ec5ce21cb46
parent 37476 e9dea82ea1f3
child 37682 cb71e0f9ac6f
equal deleted inserted replaced
37541:3e5e37204b32 37542:1ec5ce21cb46
    36     """Generate frames to run a command and send them to a reactor."""
    36     """Generate frames to run a command and send them to a reactor."""
    37     return sendframes(reactor,
    37     return sendframes(reactor,
    38                       framing.createcommandframes(stream, rid, cmd, args,
    38                       framing.createcommandframes(stream, rid, cmd, args,
    39                                                   datafh))
    39                                                   datafh))
    40 
    40 
    41 class FrameHumanStringTests(unittest.TestCase):
       
    42     def testbasic(self):
       
    43         self.assertEqual(ffs(b'1 1 0 1 0 '),
       
    44                          b'\x00\x00\x00\x01\x00\x01\x00\x10')
       
    45 
       
    46         self.assertEqual(ffs(b'2 4 0 1 0 '),
       
    47                          b'\x00\x00\x00\x02\x00\x04\x00\x10')
       
    48 
       
    49         self.assertEqual(ffs(b'2 4 0 1 0 foo'),
       
    50                          b'\x03\x00\x00\x02\x00\x04\x00\x10foo')
       
    51 
       
    52     def testcborint(self):
       
    53         self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'),
       
    54                          b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f')
       
    55 
       
    56         self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'),
       
    57                          b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*')
       
    58 
       
    59         self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'),
       
    60                          b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a'
       
    61                          b'\x00\x10\x00\x00')
       
    62 
       
    63         self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'),
       
    64                          b'\x01\x00\x00\x01\x00\x01\x00\x10\x00')
       
    65 
       
    66         self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'),
       
    67                          b'\x01\x00\x00\x01\x00\x01\x00\x10 ')
       
    68 
       
    69         self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'),
       
    70                          b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r')
       
    71 
       
    72     def testcborstrings(self):
       
    73         self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"),
       
    74                          b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo')
       
    75 
       
    76         self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"),
       
    77                          b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo')
       
    78 
       
    79     def testcborlists(self):
       
    80         self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
       
    81                          b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4'
       
    82                          b'\x18*Cfoo')
       
    83 
       
    84     def testcbordicts(self):
       
    85         self.assertEqual(ffs(b"1 1 0 1 0 "
       
    86                              b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
       
    87                          b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2'
       
    88                          b'CbarDval2CfooDval1')
       
    89 
       
    90 class FrameTests(unittest.TestCase):
       
    91     def testdataexactframesize(self):
       
    92         data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
       
    93 
       
    94         stream = framing.stream(1)
       
    95         frames = list(framing.createcommandframes(stream, 1, b'command',
       
    96                                                   {}, data))
       
    97         self.assertEqual(frames, [
       
    98             ffs(b'1 1 stream-begin command-request new|have-data '
       
    99                 b"cbor:{b'name': b'command'}"),
       
   100             ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
       
   101             ffs(b'1 1 0 command-data eos ')
       
   102         ])
       
   103 
       
   104     def testdatamultipleframes(self):
       
   105         data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
       
   106 
       
   107         stream = framing.stream(1)
       
   108         frames = list(framing.createcommandframes(stream, 1, b'command', {},
       
   109                                                   data))
       
   110         self.assertEqual(frames, [
       
   111             ffs(b'1 1 stream-begin command-request new|have-data '
       
   112                 b"cbor:{b'name': b'command'}"),
       
   113             ffs(b'1 1 0 command-data continuation %s' % (
       
   114                 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
       
   115             ffs(b'1 1 0 command-data eos x'),
       
   116         ])
       
   117 
       
   118     def testargsanddata(self):
       
   119         data = util.bytesio(b'x' * 100)
       
   120 
       
   121         stream = framing.stream(1)
       
   122         frames = list(framing.createcommandframes(stream, 1, b'command', {
       
   123             b'key1': b'key1value',
       
   124             b'key2': b'key2value',
       
   125             b'key3': b'key3value',
       
   126         }, data))
       
   127 
       
   128         self.assertEqual(frames, [
       
   129             ffs(b'1 1 stream-begin command-request new|have-data '
       
   130                 b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
       
   131                 b"b'key2': b'key2value', b'key3': b'key3value'}}"),
       
   132             ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
       
   133         ])
       
   134 
       
   135     def testtextoutputformattingstringtype(self):
       
   136         """Formatting string must be bytes."""
       
   137         with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
       
   138             list(framing.createtextoutputframe(None, 1, [
       
   139                 (b'foo'.decode('ascii'), [], [])]))
       
   140 
       
   141     def testtextoutputargumentbytes(self):
       
   142         with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
       
   143             list(framing.createtextoutputframe(None, 1, [
       
   144                 (b'foo', [b'foo'.decode('ascii')], [])]))
       
   145 
       
   146     def testtextoutputlabelbytes(self):
       
   147         with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
       
   148             list(framing.createtextoutputframe(None, 1, [
       
   149                 (b'foo', [], [b'foo'.decode('ascii')])]))
       
   150 
       
   151     def testtextoutput1simpleatom(self):
       
   152         stream = framing.stream(1)
       
   153         val = list(framing.createtextoutputframe(stream, 1, [
       
   154             (b'foo', [], [])]))
       
   155 
       
   156         self.assertEqual(val, [
       
   157             ffs(b'1 1 stream-begin text-output 0 '
       
   158                 b"cbor:[{b'msg': b'foo'}]"),
       
   159         ])
       
   160 
       
   161     def testtextoutput2simpleatoms(self):
       
   162         stream = framing.stream(1)
       
   163         val = list(framing.createtextoutputframe(stream, 1, [
       
   164             (b'foo', [], []),
       
   165             (b'bar', [], []),
       
   166         ]))
       
   167 
       
   168         self.assertEqual(val, [
       
   169             ffs(b'1 1 stream-begin text-output 0 '
       
   170                 b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]")
       
   171         ])
       
   172 
       
   173     def testtextoutput1arg(self):
       
   174         stream = framing.stream(1)
       
   175         val = list(framing.createtextoutputframe(stream, 1, [
       
   176             (b'foo %s', [b'val1'], []),
       
   177         ]))
       
   178 
       
   179         self.assertEqual(val, [
       
   180             ffs(b'1 1 stream-begin text-output 0 '
       
   181                 b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]")
       
   182         ])
       
   183 
       
   184     def testtextoutput2arg(self):
       
   185         stream = framing.stream(1)
       
   186         val = list(framing.createtextoutputframe(stream, 1, [
       
   187             (b'foo %s %s', [b'val', b'value'], []),
       
   188         ]))
       
   189 
       
   190         self.assertEqual(val, [
       
   191             ffs(b'1 1 stream-begin text-output 0 '
       
   192                 b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]")
       
   193         ])
       
   194 
       
   195     def testtextoutput1label(self):
       
   196         stream = framing.stream(1)
       
   197         val = list(framing.createtextoutputframe(stream, 1, [
       
   198             (b'foo', [], [b'label']),
       
   199         ]))
       
   200 
       
   201         self.assertEqual(val, [
       
   202             ffs(b'1 1 stream-begin text-output 0 '
       
   203                 b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]")
       
   204         ])
       
   205 
       
   206     def testargandlabel(self):
       
   207         stream = framing.stream(1)
       
   208         val = list(framing.createtextoutputframe(stream, 1, [
       
   209             (b'foo %s', [b'arg'], [b'label']),
       
   210         ]))
       
   211 
       
   212         self.assertEqual(val, [
       
   213             ffs(b'1 1 stream-begin text-output 0 '
       
   214                 b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
       
   215                 b"b'labels': [b'label']}]")
       
   216         ])
       
   217 
    41 
   218 class ServerReactorTests(unittest.TestCase):
    42 class ServerReactorTests(unittest.TestCase):
   219     def _sendsingleframe(self, reactor, f):
    43     def _sendsingleframe(self, reactor, f):
   220         results = list(sendframes(reactor, [f]))
    44         results = list(sendframes(reactor, [f]))
   221         self.assertEqual(len(results), 1)
    45         self.assertEqual(len(results), 1)