comparison rust/chg/src/locator.rs @ 44753:a347a329e48d

rust-chg: reimplement locator by using async/await and tokio-0.2 connect_spawned() is rewritten from scratch by using std::process. Before, it would select completion of either connection or server process. New code could be implemented as such, but it's much simpler to occasionally run try_wait() to detect server death. Differential Revision: https://phab.mercurial-scm.org/D8447
author Yuya Nishihara <yuya@tcha.org>
date Sat, 11 Apr 2020 00:47:32 +0900
parents e9e44e61042b
children 27fe8cc1338f
comparison
equal deleted inserted replaced
44752:d6f706929120 44753:a347a329e48d
3 // This software may be used and distributed according to the terms of the 3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version. 4 // GNU General Public License version 2 or any later version.
5 5
6 //! Utility for locating command-server process. 6 //! Utility for locating command-server process.
7 7
8 use futures::future::{self, Either, Loop};
9 use log::debug; 8 use log::debug;
10 use std::env; 9 use std::env;
11 use std::ffi::{OsStr, OsString}; 10 use std::ffi::{OsStr, OsString};
12 use std::fs::{self, DirBuilder}; 11 use std::fs::{self, DirBuilder};
13 use std::io; 12 use std::io;
14 use std::os::unix::ffi::{OsStrExt, OsStringExt}; 13 use std::os::unix::ffi::{OsStrExt, OsStringExt};
15 use std::os::unix::fs::{DirBuilderExt, MetadataExt}; 14 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
16 use std::path::{Path, PathBuf}; 15 use std::path::{Path, PathBuf};
17 use std::process; 16 use std::process::{self, Child, Command};
18 use std::time::Duration; 17 use std::time::{Duration, Instant};
19 use tokio::prelude::*;
20 use tokio::process::{Child, Command};
21 use tokio::time; 18 use tokio::time;
22 use tokio_hglib::UnixClient; 19
23 20 use crate::clientext::ChgClient;
24 use crate::clientext::ChgClientExt;
25 use crate::message::{Instruction, ServerSpec}; 21 use crate::message::{Instruction, ServerSpec};
26 use crate::procutil; 22 use crate::procutil;
27 23
28 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &[ 24 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &[
29 "attachio", 25 "attachio",
80 } 76 }
81 77
82 /// Connects to the server. 78 /// Connects to the server.
83 /// 79 ///
84 /// The server process will be spawned if not running. 80 /// The server process will be spawned if not running.
85 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { 81 pub async fn connect(&mut self) -> io::Result<ChgClient> {
86 future::loop_fn((self, 0), |(loc, cnt)| { 82 for _cnt in 0..10 {
87 if cnt < 10 { 83 let mut client = self.try_connect().await?;
88 let fut = loc 84 let instructions = client.validate(&self.hg_early_args).await?;
89 .try_connect() 85 let reconnect = self.run_instructions(&instructions)?;
90 .and_then(|(loc, client)| { 86 if !reconnect {
91 client 87 return Ok(client);
92 .validate(&loc.hg_early_args) 88 }
93 .map(|(client, instructions)| (loc, client, instructions)) 89 }
94 }) 90
95 .and_then(move |(loc, client, instructions)| { 91 // TODO: unindent
96 loc.run_instructions(client, instructions, cnt) 92 {
97 }); 93 {
98 Either::A(fut)
99 } else {
100 let msg = format!( 94 let msg = format!(
101 concat!( 95 concat!(
102 "too many redirections.\n", 96 "too many redirections.\n",
103 "Please make sure {:?} is not a wrapper which ", 97 "Please make sure {:?} is not a wrapper which ",
104 "changes sensitive environment variables ", 98 "changes sensitive environment variables ",
105 "before executing hg. If you have to use a ", 99 "before executing hg. If you have to use a ",
106 "wrapper, wrap chg instead of hg.", 100 "wrapper, wrap chg instead of hg.",
107 ), 101 ),
108 loc.hg_command 102 self.hg_command
109 ); 103 );
110 Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg))) 104 Err(io::Error::new(io::ErrorKind::Other, msg))
111 } 105 }
112 }) 106 }
113 } 107 }
114 108
115 /// Runs instructions received from the server. 109 /// Runs instructions received from the server.
116 fn run_instructions( 110 ///
117 mut self, 111 /// Returns true if the client should try connecting to the other server.
118 client: UnixClient, 112 fn run_instructions(&mut self, instructions: &[Instruction]) -> io::Result<bool> {
119 instructions: Vec<Instruction>,
120 cnt: usize,
121 ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> {
122 let mut reconnect = false; 113 let mut reconnect = false;
123 for inst in instructions { 114 for inst in instructions {
124 debug!("instruction: {:?}", inst); 115 debug!("instruction: {:?}", inst);
125 match inst { 116 match inst {
126 Instruction::Exit(_) => { 117 Instruction::Exit(_) => {
127 // Just returns the current connection to run the 118 // Just returns the current connection to run the
128 // unparsable command and report the error 119 // unparsable command and report the error
129 return Ok(Loop::Break((self, client))); 120 return Ok(false);
130 } 121 }
131 Instruction::Reconnect => { 122 Instruction::Reconnect => {
132 reconnect = true; 123 reconnect = true;
133 } 124 }
134 Instruction::Redirect(path) => { 125 Instruction::Redirect(path) => {
137 "insecure redirect instruction from server: {}", 128 "insecure redirect instruction from server: {}",
138 path.display() 129 path.display()
139 ); 130 );
140 return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); 131 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
141 } 132 }
142 self.redirect_sock_path = Some(path); 133 self.redirect_sock_path = Some(path.to_owned());
143 reconnect = true; 134 reconnect = true;
144 } 135 }
145 Instruction::Unlink(path) => { 136 Instruction::Unlink(path) => {
146 if path.parent() != self.base_sock_path.parent() { 137 if path.parent() != self.base_sock_path.parent() {
147 let msg = format!( 138 let msg = format!(
153 fs::remove_file(path).unwrap_or(()); // may race 144 fs::remove_file(path).unwrap_or(()); // may race
154 } 145 }
155 } 146 }
156 } 147 }
157 148
158 if reconnect { 149 Ok(reconnect)
159 Ok(Loop::Continue((self, cnt + 1)))
160 } else {
161 Ok(Loop::Break((self, client)))
162 }
163 } 150 }
164 151
165 /// Tries to connect to the existing server, or spawns new if not running. 152 /// Tries to connect to the existing server, or spawns new if not running.
166 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { 153 async fn try_connect(&mut self) -> io::Result<ChgClient> {
167 let sock_path = self 154 let sock_path = self
168 .redirect_sock_path 155 .redirect_sock_path
169 .as_ref() 156 .as_ref()
170 .unwrap_or(&self.base_sock_path) 157 .unwrap_or(&self.base_sock_path)
171 .clone(); 158 .clone();
172 debug!("try connect to {}", sock_path.display()); 159 debug!("try connect to {}", sock_path.display());
173 UnixClient::connect(sock_path) 160 // TODO: unindent
174 .then(|res| { 161 {
175 match res { 162 {
176 Ok(client) => Either::A(future::ok((self, client))), 163 let mut client = match ChgClient::connect(sock_path).await {
164 Ok(client) => client,
177 Err(_) => { 165 Err(_) => {
178 // Prevent us from being re-connected to the outdated 166 // Prevent us from being re-connected to the outdated
179 // master server: We were told by the server to redirect 167 // master server: We were told by the server to redirect
180 // to redirect_sock_path, which didn't work. We do not 168 // to redirect_sock_path, which didn't work. We do not
181 // want to connect to the same master server again 169 // want to connect to the same master server again
182 // because it would probably tell us the same thing. 170 // because it would probably tell us the same thing.
183 if self.redirect_sock_path.is_some() { 171 if self.redirect_sock_path.is_some() {
184 fs::remove_file(&self.base_sock_path).unwrap_or(()); 172 fs::remove_file(&self.base_sock_path).unwrap_or(());
185 // may race 173 // may race
186 } 174 }
187 Either::B(self.spawn_connect()) 175 self.spawn_connect().await?
188 } 176 }
189 } 177 };
190 })
191 .and_then(|(loc, client)| {
192 check_server_capabilities(client.server_spec())?; 178 check_server_capabilities(client.server_spec())?;
193 Ok((loc, client))
194 })
195 .and_then(|(loc, client)| {
196 // It's purely optional, and the server might not support this command. 179 // It's purely optional, and the server might not support this command.
197 if client.server_spec().capabilities.contains("setprocname") { 180 if client.server_spec().capabilities.contains("setprocname") {
198 let fut = client 181 client
199 .set_process_name(format!("chg[worker/{}]", loc.process_id)) 182 .set_process_name(format!("chg[worker/{}]", self.process_id))
200 .map(|client| (loc, client)); 183 .await?;
201 Either::A(fut) 184 }
202 } else { 185 client.set_current_dir(&self.current_dir).await?;
203 Either::B(future::ok((loc, client)))
204 }
205 })
206 .and_then(|(loc, client)| {
207 client 186 client
208 .set_current_dir(&loc.current_dir) 187 .set_env_vars_os(self.env_vars.iter().cloned())
209 .map(|client| (loc, client)) 188 .await?;
210 }) 189 Ok(client)
211 .and_then(|(loc, client)| { 190 }
212 client 191 }
213 .set_env_vars_os(loc.env_vars.iter().cloned())
214 .map(|client| (loc, client))
215 })
216 } 192 }
217 193
218 /// Spawns new server process and connects to it. 194 /// Spawns new server process and connects to it.
219 /// 195 ///
220 /// The server will be spawned at the current working directory, then 196 /// The server will be spawned at the current working directory, then
221 /// chdir to "/", so that the server will load configs from the target 197 /// chdir to "/", so that the server will load configs from the target
222 /// repository. 198 /// repository.
223 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { 199 async fn spawn_connect(&mut self) -> io::Result<ChgClient> {
224 let sock_path = self.temp_sock_path(); 200 let sock_path = self.temp_sock_path();
225 debug!("start cmdserver at {}", sock_path.display()); 201 debug!("start cmdserver at {}", sock_path.display());
226 Command::new(&self.hg_command) 202 let server = Command::new(&self.hg_command)
227 .arg("serve") 203 .arg("serve")
228 .arg("--cmdserver") 204 .arg("--cmdserver")
229 .arg("chgunix") 205 .arg("chgunix")
230 .arg("--address") 206 .arg("--address")
231 .arg(&sock_path) 207 .arg(&sock_path)
234 .args(&self.hg_early_args) 210 .args(&self.hg_early_args)
235 .current_dir(&self.current_dir) 211 .current_dir(&self.current_dir)
236 .env_clear() 212 .env_clear()
237 .envs(self.env_vars.iter().cloned()) 213 .envs(self.env_vars.iter().cloned())
238 .env("CHGINTERNALMARK", "") 214 .env("CHGINTERNALMARK", "")
239 .spawn() 215 .spawn()?;
240 .into_future() 216 let client = self.connect_spawned(server, &sock_path).await?;
241 .and_then(|server| self.connect_spawned(server, sock_path)) 217 // TODO: unindent
242 .and_then(|(loc, client, sock_path)| { 218 {
219 {
243 debug!( 220 debug!(
244 "rename {} to {}", 221 "rename {} to {}",
245 sock_path.display(), 222 sock_path.display(),
246 loc.base_sock_path.display() 223 self.base_sock_path.display()
247 ); 224 );
248 fs::rename(&sock_path, &loc.base_sock_path)?; 225 fs::rename(&sock_path, &self.base_sock_path)?;
249 Ok((loc, client)) 226 Ok(client)
250 }) 227 }
228 }
251 } 229 }
252 230
253 /// Tries to connect to the just spawned server repeatedly until timeout 231 /// Tries to connect to the just spawned server repeatedly until timeout
254 /// exceeded. 232 /// exceeded.
255 fn connect_spawned( 233 async fn connect_spawned(
256 self, 234 &mut self,
257 server: Child, 235 mut server: Child,
258 sock_path: PathBuf, 236 sock_path: &Path,
259 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> { 237 ) -> io::Result<ChgClient> {
260 debug!("try connect to {} repeatedly", sock_path.display()); 238 debug!("try connect to {} repeatedly", sock_path.display());
261 let connect = future::loop_fn(sock_path, |sock_path| {
262 UnixClient::connect(sock_path.clone()).then(|res| {
263 match res {
264 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
265 Err(_) => {
266 // try again with slight delay
267 let fut = time::delay_for(Duration::from_millis(10))
268 .map(|()| Loop::Continue(sock_path))
269 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
270 Either::B(fut)
271 }
272 }
273 })
274 });
275
276 // waits for either connection established or server failed to start 239 // waits for either connection established or server failed to start
277 connect 240 let start_time = Instant::now();
278 .select2(server) 241 while start_time.elapsed() < self.timeout {
279 .map_err(|res| res.split().0) 242 if let Ok(client) = ChgClient::connect(&sock_path).await {
280 .timeout(self.timeout) 243 // server handle is dropped here, but the detached process
281 .map_err(|err| { 244 // will continue running in background
282 err.into_inner().unwrap_or_else(|| { 245 return Ok(client);
283 io::Error::new( 246 }
284 io::ErrorKind::TimedOut, 247
285 "timed out while connecting to server", 248 if let Some(st) = server.try_wait()? {
286 ) 249 return Err(io::Error::new(
287 }) 250 io::ErrorKind::Other,
288 }) 251 format!("server exited too early: {}", st),
289 .and_then(|res| { 252 ));
290 match res { 253 }
291 Either::A(((client, sock_path), server)) => { 254
292 server.forget(); // continue to run in background 255 // try again with slight delay
293 Ok((self, client, sock_path)) 256 time::delay_for(Duration::from_millis(10)).await;
294 } 257 }
295 Either::B((st, _)) => Err(io::Error::new( 258
296 io::ErrorKind::Other, 259 Err(io::Error::new(
297 format!("server exited too early: {}", st), 260 io::ErrorKind::TimedOut,
298 )), 261 "timed out while connecting to server",
299 } 262 ))
300 })
301 } 263 }
302 } 264 }
303 265
304 /// Determines the server socket to connect to. 266 /// Determines the server socket to connect to.
305 /// 267 ///