920 self._nextrequestid = 1 |
920 self._nextrequestid = 1 |
921 # We only support a single outgoing stream for now. |
921 # We only support a single outgoing stream for now. |
922 self._outgoingstream = stream(1) |
922 self._outgoingstream = stream(1) |
923 self._pendingrequests = collections.deque() |
923 self._pendingrequests = collections.deque() |
924 self._activerequests = {} |
924 self._activerequests = {} |
|
925 self._incomingstreams = {} |
925 |
926 |
926 def callcommand(self, name, args, datafh=None): |
927 def callcommand(self, name, args, datafh=None): |
927 """Request that a command be executed. |
928 """Request that a command be executed. |
928 |
929 |
929 Receives the command name, a dict of arguments to pass to the command, |
930 Receives the command name, a dict of arguments to pass to the command, |
1005 |
1006 |
1006 for frame in res: |
1007 for frame in res: |
1007 yield frame |
1008 yield frame |
1008 |
1009 |
1009 request.state = 'sent' |
1010 request.state = 'sent' |
|
1011 |
|
1012 def onframerecv(self, frame): |
|
1013 """Process a frame that has been received off the wire. |
|
1014 |
|
1015 Returns a 2-tuple of (action, meta) describing further action the |
|
1016 caller needs to take as a result of receiving this frame. |
|
1017 """ |
|
1018 if frame.streamid % 2: |
|
1019 return 'error', { |
|
1020 'message': ( |
|
1021 _('received frame with odd numbered stream ID: %d') % |
|
1022 frame.streamid), |
|
1023 } |
|
1024 |
|
1025 if frame.streamid not in self._incomingstreams: |
|
1026 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: |
|
1027 return 'error', { |
|
1028 'message': _('received frame on unknown stream ' |
|
1029 'without beginning of stream flag set'), |
|
1030 } |
|
1031 |
|
1032 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: |
|
1033 raise error.ProgrammingError('support for decoding stream ' |
|
1034 'payloads not yet implemneted') |
|
1035 |
|
1036 if frame.streamflags & STREAM_FLAG_END_STREAM: |
|
1037 del self._incomingstreams[frame.streamid] |
|
1038 |
|
1039 if frame.requestid not in self._activerequests: |
|
1040 return 'error', { |
|
1041 'message': (_('received frame for inactive request ID: %d') % |
|
1042 frame.requestid), |
|
1043 } |
|
1044 |
|
1045 request = self._activerequests[frame.requestid] |
|
1046 request.state = 'receiving' |
|
1047 |
|
1048 handlers = { |
|
1049 FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe, |
|
1050 } |
|
1051 |
|
1052 meth = handlers.get(frame.typeid) |
|
1053 if not meth: |
|
1054 raise error.ProgrammingError('unhandled frame type: %d' % |
|
1055 frame.typeid) |
|
1056 |
|
1057 return meth(request, frame) |
|
1058 |
|
1059 def _onbytesresponseframe(self, request, frame): |
|
1060 if frame.flags & FLAG_BYTES_RESPONSE_EOS: |
|
1061 request.state = 'received' |
|
1062 del self._activerequests[request.requestid] |
|
1063 |
|
1064 return 'responsedata', { |
|
1065 'request': request, |
|
1066 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION, |
|
1067 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS, |
|
1068 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR, |
|
1069 'data': frame.payload, |
|
1070 } |