Mercurial > hg
view rust/chg/src/runcommand.rs @ 43818:ce088b38f92b
rust: run rustfmt
# skip-blame automated reformatting
Differential Revision: https://phab.mercurial-scm.org/D7578
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 07 Dec 2019 13:06:25 -0800 |
parents | 571d8eb39095 |
children | 90e05b304902 |
line wrap: on
line source
// Copyright 2018 Yuya Nishihara <yuya@tcha.org> // // This software may be used and distributed according to the terms of the // GNU General Public License version 2 or any later version. //! Functions to run Mercurial command in cHg-aware command server. use bytes::Bytes; use futures::future::IntoFuture; use futures::{Async, Future, Poll}; use std::io; use std::mem; use std::os::unix::io::AsRawFd; use tokio_hglib::codec::ChannelMessage; use tokio_hglib::protocol::MessageLoop; use tokio_hglib::{Client, Connection}; use super::attachio::AttachIo; use super::message::{self, CommandType}; use super::uihandler::SystemHandler; enum AsyncS<R, S> { Ready(R), NotReady(S), PollAgain(S), } enum CommandState<C, H> where C: Connection, H: SystemHandler, { Running(MessageLoop<C>, H), SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future), AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H), WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future), Finished, } type CommandPoll<C, H> = io::Result<(AsyncS<(Client<C>, H, i32), CommandState<C, H>>)>; /// Future resolves to `(exit_code, client)`. #[must_use = "futures do nothing unless polled"] pub struct ChgRunCommand<C, H> where C: Connection, H: SystemHandler, { state: CommandState<C, H>, } impl<C, H> ChgRunCommand<C, H> where C: Connection + AsRawFd, H: SystemHandler, { pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> { let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args); ChgRunCommand { state: CommandState::Running(msg_loop, handler), } } } impl<C, H> Future for ChgRunCommand<C, H> where C: Connection + AsRawFd, H: SystemHandler, { type Item = (Client<C>, H, i32); type Error = io::Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { loop { let state = mem::replace(&mut self.state, CommandState::Finished); match state.poll()? { AsyncS::Ready((client, handler, code)) => { return Ok(Async::Ready((client, handler, code))); } AsyncS::NotReady(newstate) => { self.state = newstate; return Ok(Async::NotReady); } AsyncS::PollAgain(newstate) => { self.state = newstate; } } } } } impl<C, H> CommandState<C, H> where C: Connection + AsRawFd, H: SystemHandler, { fn poll(self) -> CommandPoll<C, H> { match self { CommandState::Running(mut msg_loop, handler) => { if let Async::Ready((client, msg)) = msg_loop.poll()? { process_message(client, handler, msg) } else { Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler))) } } CommandState::SpawningPager(client, mut fut) => { if let Async::Ready((handler, pin)) = fut.poll()? { let fut = AttachIo::with_client(client, io::stdin(), pin, None); Ok(AsyncS::PollAgain(CommandState::AttachingPager( fut, handler, ))) } else { Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut))) } } CommandState::AttachingPager(mut fut, handler) => { if let Async::Ready(client) = fut.poll()? { let msg_loop = MessageLoop::start(client, b""); // terminator Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) } else { Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler))) } } CommandState::WaitingSystem(client, mut fut) => { if let Async::Ready((handler, code)) = fut.poll()? { let data = message::pack_result_code(code); let msg_loop = MessageLoop::resume_with_data(client, data); Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) } else { Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut))) } } CommandState::Finished => panic!("poll ChgRunCommand after it's done"), } } } fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H> where C: Connection, H: SystemHandler, { match msg { ChannelMessage::Data(b'r', data) => { let code = message::parse_result_code(data)?; Ok(AsyncS::Ready((client, handler, code))) } ChannelMessage::Data(..) => { // just ignores data sent to optional channel let msg_loop = MessageLoop::resume(client); Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) } ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(io::Error::new( io::ErrorKind::InvalidData, "unsupported request", )), ChannelMessage::SystemRequest(data) => { let (cmd_type, cmd_spec) = message::parse_command_spec(data)?; match cmd_type { CommandType::Pager => { let fut = handler.spawn_pager(cmd_spec).into_future(); Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut))) } CommandType::System => { let fut = handler.run_system(cmd_spec).into_future(); Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut))) } } } } }