rust/chg/src/runcommand.rs
changeset 43836 ce088b38f92b
parent 39976 571d8eb39095
child 44686 90e05b304902
equal deleted inserted replaced
43835:d9f85f61f0ed 43836:ce088b38f92b
     9 use futures::future::IntoFuture;
     9 use futures::future::IntoFuture;
    10 use futures::{Async, Future, Poll};
    10 use futures::{Async, Future, Poll};
    11 use std::io;
    11 use std::io;
    12 use std::mem;
    12 use std::mem;
    13 use std::os::unix::io::AsRawFd;
    13 use std::os::unix::io::AsRawFd;
    14 use tokio_hglib::{Client, Connection};
       
    15 use tokio_hglib::codec::ChannelMessage;
    14 use tokio_hglib::codec::ChannelMessage;
    16 use tokio_hglib::protocol::MessageLoop;
    15 use tokio_hglib::protocol::MessageLoop;
       
    16 use tokio_hglib::{Client, Connection};
    17 
    17 
    18 use super::attachio::AttachIo;
    18 use super::attachio::AttachIo;
    19 use super::message::{self, CommandType};
    19 use super::message::{self, CommandType};
    20 use super::uihandler::SystemHandler;
    20 use super::uihandler::SystemHandler;
    21 
    21 
    24     NotReady(S),
    24     NotReady(S),
    25     PollAgain(S),
    25     PollAgain(S),
    26 }
    26 }
    27 
    27 
    28 enum CommandState<C, H>
    28 enum CommandState<C, H>
    29     where C: Connection,
    29 where
    30           H: SystemHandler,
    30     C: Connection,
       
    31     H: SystemHandler,
    31 {
    32 {
    32     Running(MessageLoop<C>, H),
    33     Running(MessageLoop<C>, H),
    33     SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
    34     SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
    34     AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
    35     AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
    35     WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
    36     WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
    39 type CommandPoll<C, H> = io::Result<(AsyncS<(Client<C>, H, i32), CommandState<C, H>>)>;
    40 type CommandPoll<C, H> = io::Result<(AsyncS<(Client<C>, H, i32), CommandState<C, H>>)>;
    40 
    41 
    41 /// Future resolves to `(exit_code, client)`.
    42 /// Future resolves to `(exit_code, client)`.
    42 #[must_use = "futures do nothing unless polled"]
    43 #[must_use = "futures do nothing unless polled"]
    43 pub struct ChgRunCommand<C, H>
    44 pub struct ChgRunCommand<C, H>
    44     where C: Connection,
    45 where
    45           H: SystemHandler,
    46     C: Connection,
       
    47     H: SystemHandler,
    46 {
    48 {
    47     state: CommandState<C, H>,
    49     state: CommandState<C, H>,
    48 }
    50 }
    49 
    51 
    50 impl<C, H> ChgRunCommand<C, H>
    52 impl<C, H> ChgRunCommand<C, H>
    51     where C: Connection + AsRawFd,
    53 where
    52           H: SystemHandler,
    54     C: Connection + AsRawFd,
       
    55     H: SystemHandler,
    53 {
    56 {
    54     pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes)
    57     pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
    55                        -> ChgRunCommand<C, H> {
       
    56         let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
    58         let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
    57         ChgRunCommand {
    59         ChgRunCommand {
    58             state: CommandState::Running(msg_loop, handler),
    60             state: CommandState::Running(msg_loop, handler),
    59         }
    61         }
    60     }
    62     }
    61 }
    63 }
    62 
    64 
    63 impl<C, H> Future for ChgRunCommand<C, H>
    65 impl<C, H> Future for ChgRunCommand<C, H>
    64     where C: Connection + AsRawFd,
    66 where
    65           H: SystemHandler,
    67     C: Connection + AsRawFd,
       
    68     H: SystemHandler,
    66 {
    69 {
    67     type Item = (Client<C>, H, i32);
    70     type Item = (Client<C>, H, i32);
    68     type Error = io::Error;
    71     type Error = io::Error;
    69 
    72 
    70     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    73     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    85         }
    88         }
    86     }
    89     }
    87 }
    90 }
    88 
    91 
    89 impl<C, H> CommandState<C, H>
    92 impl<C, H> CommandState<C, H>
    90     where C: Connection + AsRawFd,
    93 where
    91           H: SystemHandler,
    94     C: Connection + AsRawFd,
       
    95     H: SystemHandler,
    92 {
    96 {
    93     fn poll(self) -> CommandPoll<C, H> {
    97     fn poll(self) -> CommandPoll<C, H> {
    94         match self {
    98         match self {
    95             CommandState::Running(mut msg_loop, handler) => {
    99             CommandState::Running(mut msg_loop, handler) => {
    96                 if let Async::Ready((client, msg)) = msg_loop.poll()? {
   100                 if let Async::Ready((client, msg)) = msg_loop.poll()? {
   100                 }
   104                 }
   101             }
   105             }
   102             CommandState::SpawningPager(client, mut fut) => {
   106             CommandState::SpawningPager(client, mut fut) => {
   103                 if let Async::Ready((handler, pin)) = fut.poll()? {
   107                 if let Async::Ready((handler, pin)) = fut.poll()? {
   104                     let fut = AttachIo::with_client(client, io::stdin(), pin, None);
   108                     let fut = AttachIo::with_client(client, io::stdin(), pin, None);
   105                     Ok(AsyncS::PollAgain(CommandState::AttachingPager(fut, handler)))
   109                     Ok(AsyncS::PollAgain(CommandState::AttachingPager(
       
   110                         fut, handler,
       
   111                     )))
   106                 } else {
   112                 } else {
   107                     Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
   113                     Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
   108                 }
   114                 }
   109             }
   115             }
   110             CommandState::AttachingPager(mut fut, handler) => {
   116             CommandState::AttachingPager(mut fut, handler) => {
   111                 if let Async::Ready(client) = fut.poll()? {
   117                 if let Async::Ready(client) = fut.poll()? {
   112                     let msg_loop = MessageLoop::start(client, b"");  // terminator
   118                     let msg_loop = MessageLoop::start(client, b""); // terminator
   113                     Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
   119                     Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
   114                 } else {
   120                 } else {
   115                     Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
   121                     Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
   116                 }
   122                 }
   117             }
   123             }
   122                     Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
   128                     Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
   123                 } else {
   129                 } else {
   124                     Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
   130                     Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
   125                 }
   131                 }
   126             }
   132             }
   127             CommandState::Finished => panic!("poll ChgRunCommand after it's done")
   133             CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
   128         }
   134         }
   129     }
   135     }
   130 }
   136 }
   131 
   137 
   132 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
   138 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
   133     where C: Connection,
   139 where
   134           H: SystemHandler,
   140     C: Connection,
       
   141     H: SystemHandler,
   135 {
   142 {
   136     match msg {
   143     match msg {
   137         ChannelMessage::Data(b'r', data) => {
   144         ChannelMessage::Data(b'r', data) => {
   138             let code = message::parse_result_code(data)?;
   145             let code = message::parse_result_code(data)?;
   139             Ok(AsyncS::Ready((client, handler, code)))
   146             Ok(AsyncS::Ready((client, handler, code)))
   141         ChannelMessage::Data(..) => {
   148         ChannelMessage::Data(..) => {
   142             // just ignores data sent to optional channel
   149             // just ignores data sent to optional channel
   143             let msg_loop = MessageLoop::resume(client);
   150             let msg_loop = MessageLoop::resume(client);
   144             Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
   151             Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
   145         }
   152         }
   146         ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => {
   153         ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(io::Error::new(
   147             Err(io::Error::new(io::ErrorKind::InvalidData, "unsupported request"))
   154             io::ErrorKind::InvalidData,
   148         }
   155             "unsupported request",
       
   156         )),
   149         ChannelMessage::SystemRequest(data) => {
   157         ChannelMessage::SystemRequest(data) => {
   150             let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
   158             let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
   151             match cmd_type {
   159             match cmd_type {
   152                 CommandType::Pager => {
   160                 CommandType::Pager => {
   153                     let fut = handler.spawn_pager(cmd_spec).into_future();
   161                     let fut = handler.spawn_pager(cmd_spec).into_future();