Mercurial > hg
changeset 39975:571d8eb39095
rust-chg: add state machine to handle "runcommand" request with cHg extension
This is modeled after tokio-hglib's RunCommand state to support the "S"
channel message.
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Mon, 24 Sep 2018 18:21:10 +0900 |
parents | a9c5fc436fd5 |
children | 44840bcc411a |
files | rust/chg/src/lib.rs rust/chg/src/runcommand.rs |
diffstat | 2 files changed, 164 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/rust/chg/src/lib.rs Mon Sep 24 18:18:35 2018 +0900 +++ b/rust/chg/src/lib.rs Mon Sep 24 18:21:10 2018 +0900 @@ -14,6 +14,7 @@ pub mod attachio; pub mod message; pub mod procutil; +pub mod runcommand; mod uihandler; pub use uihandler::{ChgUiHandler, SystemHandler};
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rust/chg/src/runcommand.rs Mon Sep 24 18:21:10 2018 +0900 @@ -0,0 +1,163 @@ +// Copyright 2018 Yuya Nishihara <yuya@tcha.org> +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +//! Functions to run Mercurial command in cHg-aware command server. + +use bytes::Bytes; +use futures::future::IntoFuture; +use futures::{Async, Future, Poll}; +use std::io; +use std::mem; +use std::os::unix::io::AsRawFd; +use tokio_hglib::{Client, Connection}; +use tokio_hglib::codec::ChannelMessage; +use tokio_hglib::protocol::MessageLoop; + +use super::attachio::AttachIo; +use super::message::{self, CommandType}; +use super::uihandler::SystemHandler; + +enum AsyncS<R, S> { + Ready(R), + NotReady(S), + PollAgain(S), +} + +enum CommandState<C, H> + where C: Connection, + H: SystemHandler, +{ + Running(MessageLoop<C>, H), + SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future), + AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H), + WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future), + Finished, +} + +type CommandPoll<C, H> = io::Result<(AsyncS<(Client<C>, H, i32), CommandState<C, H>>)>; + +/// Future resolves to `(exit_code, client)`. +#[must_use = "futures do nothing unless polled"] +pub struct ChgRunCommand<C, H> + where C: Connection, + H: SystemHandler, +{ + state: CommandState<C, H>, +} + +impl<C, H> ChgRunCommand<C, H> + where C: Connection + AsRawFd, + H: SystemHandler, +{ + pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) + -> ChgRunCommand<C, H> { + let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args); + ChgRunCommand { + state: CommandState::Running(msg_loop, handler), + } + } +} + +impl<C, H> Future for ChgRunCommand<C, H> + where C: Connection + AsRawFd, + H: SystemHandler, +{ + type Item = (Client<C>, H, i32); + type Error = io::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + loop { + let state = mem::replace(&mut self.state, CommandState::Finished); + match state.poll()? { + AsyncS::Ready((client, handler, code)) => { + return Ok(Async::Ready((client, handler, code))); + } + AsyncS::NotReady(newstate) => { + self.state = newstate; + return Ok(Async::NotReady); + } + AsyncS::PollAgain(newstate) => { + self.state = newstate; + } + } + } + } +} + +impl<C, H> CommandState<C, H> + where C: Connection + AsRawFd, + H: SystemHandler, +{ + fn poll(self) -> CommandPoll<C, H> { + match self { + CommandState::Running(mut msg_loop, handler) => { + if let Async::Ready((client, msg)) = msg_loop.poll()? { + process_message(client, handler, msg) + } else { + Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler))) + } + } + CommandState::SpawningPager(client, mut fut) => { + if let Async::Ready((handler, pin)) = fut.poll()? { + let fut = AttachIo::with_client(client, io::stdin(), pin, None); + Ok(AsyncS::PollAgain(CommandState::AttachingPager(fut, handler))) + } else { + Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut))) + } + } + CommandState::AttachingPager(mut fut, handler) => { + if let Async::Ready(client) = fut.poll()? { + let msg_loop = MessageLoop::start(client, b""); // terminator + Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) + } else { + Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler))) + } + } + CommandState::WaitingSystem(client, mut fut) => { + if let Async::Ready((handler, code)) = fut.poll()? { + let data = message::pack_result_code(code); + let msg_loop = MessageLoop::resume_with_data(client, data); + Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) + } else { + Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut))) + } + } + CommandState::Finished => panic!("poll ChgRunCommand after it's done") + } + } +} + +fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H> + where C: Connection, + H: SystemHandler, +{ + match msg { + ChannelMessage::Data(b'r', data) => { + let code = message::parse_result_code(data)?; + Ok(AsyncS::Ready((client, handler, code))) + } + ChannelMessage::Data(..) => { + // just ignores data sent to optional channel + let msg_loop = MessageLoop::resume(client); + Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) + } + ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => { + Err(io::Error::new(io::ErrorKind::InvalidData, "unsupported request")) + } + ChannelMessage::SystemRequest(data) => { + let (cmd_type, cmd_spec) = message::parse_command_spec(data)?; + match cmd_type { + CommandType::Pager => { + let fut = handler.spawn_pager(cmd_spec).into_future(); + Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut))) + } + CommandType::System => { + let fut = handler.run_system(cmd_spec).into_future(); + Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut))) + } + } + } + } +}