9 use futures::future::IntoFuture; |
9 use futures::future::IntoFuture; |
10 use futures::{Async, Future, Poll}; |
10 use futures::{Async, Future, Poll}; |
11 use std::io; |
11 use std::io; |
12 use std::mem; |
12 use std::mem; |
13 use std::os::unix::io::AsRawFd; |
13 use std::os::unix::io::AsRawFd; |
14 use tokio_hglib::{Client, Connection}; |
|
15 use tokio_hglib::codec::ChannelMessage; |
14 use tokio_hglib::codec::ChannelMessage; |
16 use tokio_hglib::protocol::MessageLoop; |
15 use tokio_hglib::protocol::MessageLoop; |
|
16 use tokio_hglib::{Client, Connection}; |
17 |
17 |
18 use super::attachio::AttachIo; |
18 use super::attachio::AttachIo; |
19 use super::message::{self, CommandType}; |
19 use super::message::{self, CommandType}; |
20 use super::uihandler::SystemHandler; |
20 use super::uihandler::SystemHandler; |
21 |
21 |
24 NotReady(S), |
24 NotReady(S), |
25 PollAgain(S), |
25 PollAgain(S), |
26 } |
26 } |
27 |
27 |
28 enum CommandState<C, H> |
28 enum CommandState<C, H> |
29 where C: Connection, |
29 where |
30 H: SystemHandler, |
30 C: Connection, |
|
31 H: SystemHandler, |
31 { |
32 { |
32 Running(MessageLoop<C>, H), |
33 Running(MessageLoop<C>, H), |
33 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future), |
34 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future), |
34 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H), |
35 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H), |
35 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future), |
36 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future), |
39 type CommandPoll<C, H> = io::Result<(AsyncS<(Client<C>, H, i32), CommandState<C, H>>)>; |
40 type CommandPoll<C, H> = io::Result<(AsyncS<(Client<C>, H, i32), CommandState<C, H>>)>; |
40 |
41 |
41 /// Future resolves to `(exit_code, client)`. |
42 /// Future resolves to `(exit_code, client)`. |
42 #[must_use = "futures do nothing unless polled"] |
43 #[must_use = "futures do nothing unless polled"] |
43 pub struct ChgRunCommand<C, H> |
44 pub struct ChgRunCommand<C, H> |
44 where C: Connection, |
45 where |
45 H: SystemHandler, |
46 C: Connection, |
|
47 H: SystemHandler, |
46 { |
48 { |
47 state: CommandState<C, H>, |
49 state: CommandState<C, H>, |
48 } |
50 } |
49 |
51 |
50 impl<C, H> ChgRunCommand<C, H> |
52 impl<C, H> ChgRunCommand<C, H> |
51 where C: Connection + AsRawFd, |
53 where |
52 H: SystemHandler, |
54 C: Connection + AsRawFd, |
|
55 H: SystemHandler, |
53 { |
56 { |
54 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) |
57 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> { |
55 -> ChgRunCommand<C, H> { |
|
56 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args); |
58 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args); |
57 ChgRunCommand { |
59 ChgRunCommand { |
58 state: CommandState::Running(msg_loop, handler), |
60 state: CommandState::Running(msg_loop, handler), |
59 } |
61 } |
60 } |
62 } |
61 } |
63 } |
62 |
64 |
63 impl<C, H> Future for ChgRunCommand<C, H> |
65 impl<C, H> Future for ChgRunCommand<C, H> |
64 where C: Connection + AsRawFd, |
66 where |
65 H: SystemHandler, |
67 C: Connection + AsRawFd, |
|
68 H: SystemHandler, |
66 { |
69 { |
67 type Item = (Client<C>, H, i32); |
70 type Item = (Client<C>, H, i32); |
68 type Error = io::Error; |
71 type Error = io::Error; |
69 |
72 |
70 fn poll(&mut self) -> Poll<Self::Item, Self::Error> { |
73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> { |
85 } |
88 } |
86 } |
89 } |
87 } |
90 } |
88 |
91 |
89 impl<C, H> CommandState<C, H> |
92 impl<C, H> CommandState<C, H> |
90 where C: Connection + AsRawFd, |
93 where |
91 H: SystemHandler, |
94 C: Connection + AsRawFd, |
|
95 H: SystemHandler, |
92 { |
96 { |
93 fn poll(self) -> CommandPoll<C, H> { |
97 fn poll(self) -> CommandPoll<C, H> { |
94 match self { |
98 match self { |
95 CommandState::Running(mut msg_loop, handler) => { |
99 CommandState::Running(mut msg_loop, handler) => { |
96 if let Async::Ready((client, msg)) = msg_loop.poll()? { |
100 if let Async::Ready((client, msg)) = msg_loop.poll()? { |
100 } |
104 } |
101 } |
105 } |
102 CommandState::SpawningPager(client, mut fut) => { |
106 CommandState::SpawningPager(client, mut fut) => { |
103 if let Async::Ready((handler, pin)) = fut.poll()? { |
107 if let Async::Ready((handler, pin)) = fut.poll()? { |
104 let fut = AttachIo::with_client(client, io::stdin(), pin, None); |
108 let fut = AttachIo::with_client(client, io::stdin(), pin, None); |
105 Ok(AsyncS::PollAgain(CommandState::AttachingPager(fut, handler))) |
109 Ok(AsyncS::PollAgain(CommandState::AttachingPager( |
|
110 fut, handler, |
|
111 ))) |
106 } else { |
112 } else { |
107 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut))) |
113 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut))) |
108 } |
114 } |
109 } |
115 } |
110 CommandState::AttachingPager(mut fut, handler) => { |
116 CommandState::AttachingPager(mut fut, handler) => { |
111 if let Async::Ready(client) = fut.poll()? { |
117 if let Async::Ready(client) = fut.poll()? { |
112 let msg_loop = MessageLoop::start(client, b""); // terminator |
118 let msg_loop = MessageLoop::start(client, b""); // terminator |
113 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) |
119 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) |
114 } else { |
120 } else { |
115 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler))) |
121 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler))) |
116 } |
122 } |
117 } |
123 } |
122 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) |
128 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) |
123 } else { |
129 } else { |
124 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut))) |
130 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut))) |
125 } |
131 } |
126 } |
132 } |
127 CommandState::Finished => panic!("poll ChgRunCommand after it's done") |
133 CommandState::Finished => panic!("poll ChgRunCommand after it's done"), |
128 } |
134 } |
129 } |
135 } |
130 } |
136 } |
131 |
137 |
132 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H> |
138 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H> |
133 where C: Connection, |
139 where |
134 H: SystemHandler, |
140 C: Connection, |
|
141 H: SystemHandler, |
135 { |
142 { |
136 match msg { |
143 match msg { |
137 ChannelMessage::Data(b'r', data) => { |
144 ChannelMessage::Data(b'r', data) => { |
138 let code = message::parse_result_code(data)?; |
145 let code = message::parse_result_code(data)?; |
139 Ok(AsyncS::Ready((client, handler, code))) |
146 Ok(AsyncS::Ready((client, handler, code))) |
141 ChannelMessage::Data(..) => { |
148 ChannelMessage::Data(..) => { |
142 // just ignores data sent to optional channel |
149 // just ignores data sent to optional channel |
143 let msg_loop = MessageLoop::resume(client); |
150 let msg_loop = MessageLoop::resume(client); |
144 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) |
151 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) |
145 } |
152 } |
146 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => { |
153 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(io::Error::new( |
147 Err(io::Error::new(io::ErrorKind::InvalidData, "unsupported request")) |
154 io::ErrorKind::InvalidData, |
148 } |
155 "unsupported request", |
|
156 )), |
149 ChannelMessage::SystemRequest(data) => { |
157 ChannelMessage::SystemRequest(data) => { |
150 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?; |
158 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?; |
151 match cmd_type { |
159 match cmd_type { |
152 CommandType::Pager => { |
160 CommandType::Pager => { |
153 let fut = handler.spawn_pager(cmd_spec).into_future(); |
161 let fut = handler.spawn_pager(cmd_spec).into_future(); |