Mercurial > hg
changeset 44751:94cace4b80ea
rust-chg: reimplement run_command operation as async function
The crafted state machine is no longer needed thanks to async/await.
The state machine is basically rewritten as follows:
- Ready(..) -> return ..
- PollAgain(..) -> run .. and await
- Err(..) -> return Err(..)
Differential Revision: https://phab.mercurial-scm.org/D8445
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Fri, 10 Apr 2020 22:44:51 +0900 |
parents | c794d0da5fb2 |
children | d6f706929120 |
files | rust/chg/src/lib.rs rust/chg/src/runcommand.rs |
diffstat | 2 files changed, 33 insertions(+), 141 deletions(-) [+] |
line wrap: on
line diff
--- a/rust/chg/src/lib.rs Fri Apr 10 22:23:10 2020 +0900 +++ b/rust/chg/src/lib.rs Fri Apr 10 22:44:51 2020 +0900 @@ -8,7 +8,7 @@ //pub mod locator; pub mod message; pub mod procutil; -//mod runcommand; +mod runcommand; mod uihandler; //pub use clientext::ChgClientExt;
--- a/rust/chg/src/runcommand.rs Fri Apr 10 22:23:10 2020 +0900 +++ b/rust/chg/src/runcommand.rs Fri Apr 10 22:44:51 2020 +0900 @@ -6,164 +6,56 @@ //! Functions to run Mercurial command in cHg-aware command server. use bytes::Bytes; -use futures::future::IntoFuture; -use futures::{Async, Future, Poll}; use std::io; -use std::mem; use std::os::unix::io::AsRawFd; use tokio_hglib::codec::ChannelMessage; -use tokio_hglib::protocol::MessageLoop; -use tokio_hglib::{Client, Connection}; +use tokio_hglib::{Connection, Protocol}; -use crate::attachio::AttachIo; +use crate::attachio; use crate::message::{self, CommandType}; use crate::uihandler::SystemHandler; -enum AsyncS<R, S> { - Ready(R), - NotReady(S), - PollAgain(S), -} - -enum CommandState<C, H> -where - C: Connection, - H: SystemHandler, -{ - Running(MessageLoop<C>, H), - SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future), - AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H), - WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future), - Finished, -} - -type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>; - -/// Future resolves to `(exit_code, client)`. -#[must_use = "futures do nothing unless polled"] -pub struct ChgRunCommand<C, H> -where - C: Connection, - H: SystemHandler, -{ - state: CommandState<C, H>, -} - -impl<C, H> ChgRunCommand<C, H> -where - C: Connection + AsRawFd, - H: SystemHandler, -{ - pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> { - let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args); - ChgRunCommand { - state: CommandState::Running(msg_loop, handler), - } - } -} - -impl<C, H> Future for ChgRunCommand<C, H> -where - C: Connection + AsRawFd, - H: SystemHandler, -{ - type Item = (Client<C>, H, i32); - type Error = io::Error; - - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - loop { - let state = mem::replace(&mut self.state, CommandState::Finished); - match state.poll()? { - AsyncS::Ready((client, handler, code)) => { - return Ok(Async::Ready((client, handler, code))); - } - AsyncS::NotReady(newstate) => { - self.state = newstate; - return Ok(Async::NotReady); - } - AsyncS::PollAgain(newstate) => { - self.state = newstate; - } - } - } - } -} - -impl<C, H> CommandState<C, H> -where - C: Connection + AsRawFd, - H: SystemHandler, -{ - fn poll(self) -> CommandPoll<C, H> { - match self { - CommandState::Running(mut msg_loop, handler) => { - if let Async::Ready((client, msg)) = msg_loop.poll()? { - process_message(client, handler, msg) - } else { - Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler))) - } - } - CommandState::SpawningPager(client, mut fut) => { - if let Async::Ready((handler, pin)) = fut.poll()? { - let fut = AttachIo::with_client(client, io::stdin(), pin, None); - Ok(AsyncS::PollAgain(CommandState::AttachingPager( - fut, handler, - ))) - } else { - Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut))) - } - } - CommandState::AttachingPager(mut fut, handler) => { - if let Async::Ready(client) = fut.poll()? { - let msg_loop = MessageLoop::start(client, b""); // terminator - Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) - } else { - Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler))) - } - } - CommandState::WaitingSystem(client, mut fut) => { - if let Async::Ready((handler, code)) = fut.poll()? { - let data = message::pack_result_code(code); - let msg_loop = MessageLoop::resume_with_data(client, data); - Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) - } else { - Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut))) - } - } - CommandState::Finished => panic!("poll ChgRunCommand after it's done"), - } - } -} - -fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H> -where - C: Connection, - H: SystemHandler, -{ - { - match msg { +/// Runs the given Mercurial command in cHg-aware command server, and +/// fetches the result code. +/// +/// This is a subset of tokio-hglib's `run_command()` with the additional +/// SystemRequest support. +pub async fn run_command( + proto: &mut Protocol<impl Connection + AsRawFd>, + handler: &mut impl SystemHandler, + packed_args: impl Into<Bytes>, +) -> io::Result<i32> { + proto + .send_command_with_args("runcommand", packed_args) + .await?; + loop { + match proto.fetch_response().await? { ChannelMessage::Data(b'r', data) => { - let code = message::parse_result_code(data)?; - Ok(AsyncS::Ready((client, handler, code))) + return message::parse_result_code(data); } ChannelMessage::Data(..) => { // just ignores data sent to optional channel - let msg_loop = MessageLoop::resume(client); - Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) } - ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err( - io::Error::new(io::ErrorKind::InvalidData, "unsupported request"), - ), + ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "unsupported request", + )); + } ChannelMessage::SystemRequest(data) => { let (cmd_type, cmd_spec) = message::parse_command_spec(data)?; match cmd_type { CommandType::Pager => { - let fut = handler.spawn_pager(cmd_spec).into_future(); - Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut))) + // server spins new command loop while pager request is + // in progress, which can be terminated by "" command. + let pin = handler.spawn_pager(&cmd_spec).await?; + attachio::attach_io(proto, &io::stdin(), &pin, &pin).await?; + proto.send_command("").await?; // terminator } CommandType::System => { - let fut = handler.run_system(cmd_spec).into_future(); - Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut))) + let code = handler.run_system(&cmd_spec).await?; + let data = message::pack_result_code(code); + proto.send_data(data).await?; } } }