Mercurial > hg-stable
changeset 44683:9ce613d648de
rust-chg: add config validation and process returned instructions
This is the reimplementation of runinstructions() and main() in chg.c.
In Rust version, we only pass in early arguments to the server as the locator
doesn't know the full arguments. This should be fine since these arguments
are just passed in to _earlyparseopts() and _parseconfig(), which means the
server doesn't need full arguments.
Another difference is the handling of the "exit <code>" instruction. In Rust
version, we can simply reuse the connection instead of "exit(code)" as the
command error isn't displayed yet. That's because the client-side stdio is not
attached until the connection is validated. This behavior is cleaner than C,
but it also means that the early server exception wouldn't be propagated to
client because stderr isn't attached. So we might have to reconsider when to
attach/detach the server stdio.
Differential Revision: https://phab.mercurial-scm.org/D8381
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Sun, 07 Oct 2018 16:14:21 +0900 |
parents | 00ac60658654 |
children | 065048e66f32 |
files | rust/chg/src/locator.rs |
diffstat | 1 files changed, 105 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/rust/chg/src/locator.rs Sun Oct 07 16:46:30 2018 +0900 +++ b/rust/chg/src/locator.rs Sun Oct 07 16:14:21 2018 +0900 @@ -21,10 +21,11 @@ use tokio_timer; use super::clientext::ChgClientExt; -use super::message::ServerSpec; +use super::message::{Instruction, ServerSpec}; use super::procutil; -const REQUIRED_SERVER_CAPABILITIES: &[&str] = &["attachio", "chdir", "runcommand", "setenv"]; +const REQUIRED_SERVER_CAPABILITIES: &[&str] = + &["attachio", "chdir", "runcommand", "setenv", "validate"]; /// Helper to connect to and spawn a server process. #[derive(Clone, Debug)] @@ -35,6 +36,7 @@ env_vars: Vec<(OsString, OsString)>, process_id: u32, base_sock_path: PathBuf, + redirect_sock_path: Option<PathBuf>, timeout: Duration, } @@ -51,6 +53,7 @@ env_vars: env::vars_os().collect(), process_id: process::id(), base_sock_path: prepare_server_socket_path()?, + redirect_sock_path: None, timeout: default_timeout(), }) } @@ -77,16 +80,110 @@ /// /// The server process will be spawned if not running. pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { - self.try_connect() + future::loop_fn((self, 0), |(loc, cnt)| { + if cnt < 10 { + let fut = loc + .try_connect() + .and_then(|(loc, client)| { + client + .validate(&loc.hg_early_args) + .map(|(client, instructions)| (loc, client, instructions)) + }) + .and_then(move |(loc, client, instructions)| { + loc.run_instructions(client, instructions, cnt) + }); + Either::A(fut) + } else { + let msg = format!( + concat!( + "too many redirections.\n", + "Please make sure {:?} is not a wrapper which ", + "changes sensitive environment variables ", + "before executing hg. If you have to use a ", + "wrapper, wrap chg instead of hg.", + ), + loc.hg_command + ); + Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg))) + } + }) + } + + /// Runs instructions received from the server. + fn run_instructions( + mut self, + client: UnixClient, + instructions: Vec<Instruction>, + cnt: usize, + ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> { + let mut reconnect = false; + for inst in instructions { + debug!("instruction: {:?}", inst); + match inst { + Instruction::Exit(_) => { + // Just returns the current connection to run the + // unparsable command and report the error + return Ok(Loop::Break((self, client))); + } + Instruction::Reconnect => { + reconnect = true; + } + Instruction::Redirect(path) => { + if path.parent() != self.base_sock_path.parent() { + let msg = format!( + "insecure redirect instruction from server: {}", + path.display() + ); + return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); + } + self.redirect_sock_path = Some(path); + reconnect = true; + } + Instruction::Unlink(path) => { + if path.parent() != self.base_sock_path.parent() { + let msg = format!( + "insecure unlink instruction from server: {}", + path.display() + ); + return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); + } + fs::remove_file(path).unwrap_or(()); // may race + } + } + } + + if reconnect { + Ok(Loop::Continue((self, cnt + 1))) + } else { + Ok(Loop::Break((self, client))) + } } /// Tries to connect to the existing server, or spawns new if not running. fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { - debug!("try connect to {}", self.base_sock_path.display()); - UnixClient::connect(self.base_sock_path.clone()) - .then(|res| match res { - Ok(client) => Either::A(future::ok((self, client))), - Err(_) => Either::B(self.spawn_connect()), + let sock_path = self + .redirect_sock_path + .as_ref() + .unwrap_or(&self.base_sock_path) + .clone(); + debug!("try connect to {}", sock_path.display()); + UnixClient::connect(sock_path) + .then(|res| { + match res { + Ok(client) => Either::A(future::ok((self, client))), + Err(_) => { + // Prevent us from being re-connected to the outdated + // master server: We were told by the server to redirect + // to redirect_sock_path, which didn't work. We do not + // want to connect to the same master server again + // because it would probably tell us the same thing. + if self.redirect_sock_path.is_some() { + fs::remove_file(&self.base_sock_path).unwrap_or(()); + // may race + } + Either::B(self.spawn_connect()) + } + } }) .and_then(|(loc, client)| { check_server_capabilities(client.server_spec())?;