Mercurial > hg
comparison rust/chg/src/locator.rs @ 44753:a347a329e48d
rust-chg: reimplement locator by using async/await and tokio-0.2
connect_spawned() is rewritten from scratch by using std::process. Before,
it would select completion of either connection or server process. New code
could be implemented as such, but it's much simpler to occasionally run
try_wait() to detect server death.
Differential Revision: https://phab.mercurial-scm.org/D8447
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Sat, 11 Apr 2020 00:47:32 +0900 |
parents | e9e44e61042b |
children | 27fe8cc1338f |
comparison
equal
deleted
inserted
replaced
44752:d6f706929120 | 44753:a347a329e48d |
---|---|
3 // This software may be used and distributed according to the terms of the | 3 // This software may be used and distributed according to the terms of the |
4 // GNU General Public License version 2 or any later version. | 4 // GNU General Public License version 2 or any later version. |
5 | 5 |
6 //! Utility for locating command-server process. | 6 //! Utility for locating command-server process. |
7 | 7 |
8 use futures::future::{self, Either, Loop}; | |
9 use log::debug; | 8 use log::debug; |
10 use std::env; | 9 use std::env; |
11 use std::ffi::{OsStr, OsString}; | 10 use std::ffi::{OsStr, OsString}; |
12 use std::fs::{self, DirBuilder}; | 11 use std::fs::{self, DirBuilder}; |
13 use std::io; | 12 use std::io; |
14 use std::os::unix::ffi::{OsStrExt, OsStringExt}; | 13 use std::os::unix::ffi::{OsStrExt, OsStringExt}; |
15 use std::os::unix::fs::{DirBuilderExt, MetadataExt}; | 14 use std::os::unix::fs::{DirBuilderExt, MetadataExt}; |
16 use std::path::{Path, PathBuf}; | 15 use std::path::{Path, PathBuf}; |
17 use std::process; | 16 use std::process::{self, Child, Command}; |
18 use std::time::Duration; | 17 use std::time::{Duration, Instant}; |
19 use tokio::prelude::*; | |
20 use tokio::process::{Child, Command}; | |
21 use tokio::time; | 18 use tokio::time; |
22 use tokio_hglib::UnixClient; | 19 |
23 | 20 use crate::clientext::ChgClient; |
24 use crate::clientext::ChgClientExt; | |
25 use crate::message::{Instruction, ServerSpec}; | 21 use crate::message::{Instruction, ServerSpec}; |
26 use crate::procutil; | 22 use crate::procutil; |
27 | 23 |
28 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &[ | 24 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &[ |
29 "attachio", | 25 "attachio", |
80 } | 76 } |
81 | 77 |
82 /// Connects to the server. | 78 /// Connects to the server. |
83 /// | 79 /// |
84 /// The server process will be spawned if not running. | 80 /// The server process will be spawned if not running. |
85 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { | 81 pub async fn connect(&mut self) -> io::Result<ChgClient> { |
86 future::loop_fn((self, 0), |(loc, cnt)| { | 82 for _cnt in 0..10 { |
87 if cnt < 10 { | 83 let mut client = self.try_connect().await?; |
88 let fut = loc | 84 let instructions = client.validate(&self.hg_early_args).await?; |
89 .try_connect() | 85 let reconnect = self.run_instructions(&instructions)?; |
90 .and_then(|(loc, client)| { | 86 if !reconnect { |
91 client | 87 return Ok(client); |
92 .validate(&loc.hg_early_args) | 88 } |
93 .map(|(client, instructions)| (loc, client, instructions)) | 89 } |
94 }) | 90 |
95 .and_then(move |(loc, client, instructions)| { | 91 // TODO: unindent |
96 loc.run_instructions(client, instructions, cnt) | 92 { |
97 }); | 93 { |
98 Either::A(fut) | |
99 } else { | |
100 let msg = format!( | 94 let msg = format!( |
101 concat!( | 95 concat!( |
102 "too many redirections.\n", | 96 "too many redirections.\n", |
103 "Please make sure {:?} is not a wrapper which ", | 97 "Please make sure {:?} is not a wrapper which ", |
104 "changes sensitive environment variables ", | 98 "changes sensitive environment variables ", |
105 "before executing hg. If you have to use a ", | 99 "before executing hg. If you have to use a ", |
106 "wrapper, wrap chg instead of hg.", | 100 "wrapper, wrap chg instead of hg.", |
107 ), | 101 ), |
108 loc.hg_command | 102 self.hg_command |
109 ); | 103 ); |
110 Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg))) | 104 Err(io::Error::new(io::ErrorKind::Other, msg)) |
111 } | 105 } |
112 }) | 106 } |
113 } | 107 } |
114 | 108 |
115 /// Runs instructions received from the server. | 109 /// Runs instructions received from the server. |
116 fn run_instructions( | 110 /// |
117 mut self, | 111 /// Returns true if the client should try connecting to the other server. |
118 client: UnixClient, | 112 fn run_instructions(&mut self, instructions: &[Instruction]) -> io::Result<bool> { |
119 instructions: Vec<Instruction>, | |
120 cnt: usize, | |
121 ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> { | |
122 let mut reconnect = false; | 113 let mut reconnect = false; |
123 for inst in instructions { | 114 for inst in instructions { |
124 debug!("instruction: {:?}", inst); | 115 debug!("instruction: {:?}", inst); |
125 match inst { | 116 match inst { |
126 Instruction::Exit(_) => { | 117 Instruction::Exit(_) => { |
127 // Just returns the current connection to run the | 118 // Just returns the current connection to run the |
128 // unparsable command and report the error | 119 // unparsable command and report the error |
129 return Ok(Loop::Break((self, client))); | 120 return Ok(false); |
130 } | 121 } |
131 Instruction::Reconnect => { | 122 Instruction::Reconnect => { |
132 reconnect = true; | 123 reconnect = true; |
133 } | 124 } |
134 Instruction::Redirect(path) => { | 125 Instruction::Redirect(path) => { |
137 "insecure redirect instruction from server: {}", | 128 "insecure redirect instruction from server: {}", |
138 path.display() | 129 path.display() |
139 ); | 130 ); |
140 return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); | 131 return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); |
141 } | 132 } |
142 self.redirect_sock_path = Some(path); | 133 self.redirect_sock_path = Some(path.to_owned()); |
143 reconnect = true; | 134 reconnect = true; |
144 } | 135 } |
145 Instruction::Unlink(path) => { | 136 Instruction::Unlink(path) => { |
146 if path.parent() != self.base_sock_path.parent() { | 137 if path.parent() != self.base_sock_path.parent() { |
147 let msg = format!( | 138 let msg = format!( |
153 fs::remove_file(path).unwrap_or(()); // may race | 144 fs::remove_file(path).unwrap_or(()); // may race |
154 } | 145 } |
155 } | 146 } |
156 } | 147 } |
157 | 148 |
158 if reconnect { | 149 Ok(reconnect) |
159 Ok(Loop::Continue((self, cnt + 1))) | |
160 } else { | |
161 Ok(Loop::Break((self, client))) | |
162 } | |
163 } | 150 } |
164 | 151 |
165 /// Tries to connect to the existing server, or spawns new if not running. | 152 /// Tries to connect to the existing server, or spawns new if not running. |
166 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { | 153 async fn try_connect(&mut self) -> io::Result<ChgClient> { |
167 let sock_path = self | 154 let sock_path = self |
168 .redirect_sock_path | 155 .redirect_sock_path |
169 .as_ref() | 156 .as_ref() |
170 .unwrap_or(&self.base_sock_path) | 157 .unwrap_or(&self.base_sock_path) |
171 .clone(); | 158 .clone(); |
172 debug!("try connect to {}", sock_path.display()); | 159 debug!("try connect to {}", sock_path.display()); |
173 UnixClient::connect(sock_path) | 160 // TODO: unindent |
174 .then(|res| { | 161 { |
175 match res { | 162 { |
176 Ok(client) => Either::A(future::ok((self, client))), | 163 let mut client = match ChgClient::connect(sock_path).await { |
164 Ok(client) => client, | |
177 Err(_) => { | 165 Err(_) => { |
178 // Prevent us from being re-connected to the outdated | 166 // Prevent us from being re-connected to the outdated |
179 // master server: We were told by the server to redirect | 167 // master server: We were told by the server to redirect |
180 // to redirect_sock_path, which didn't work. We do not | 168 // to redirect_sock_path, which didn't work. We do not |
181 // want to connect to the same master server again | 169 // want to connect to the same master server again |
182 // because it would probably tell us the same thing. | 170 // because it would probably tell us the same thing. |
183 if self.redirect_sock_path.is_some() { | 171 if self.redirect_sock_path.is_some() { |
184 fs::remove_file(&self.base_sock_path).unwrap_or(()); | 172 fs::remove_file(&self.base_sock_path).unwrap_or(()); |
185 // may race | 173 // may race |
186 } | 174 } |
187 Either::B(self.spawn_connect()) | 175 self.spawn_connect().await? |
188 } | 176 } |
189 } | 177 }; |
190 }) | |
191 .and_then(|(loc, client)| { | |
192 check_server_capabilities(client.server_spec())?; | 178 check_server_capabilities(client.server_spec())?; |
193 Ok((loc, client)) | |
194 }) | |
195 .and_then(|(loc, client)| { | |
196 // It's purely optional, and the server might not support this command. | 179 // It's purely optional, and the server might not support this command. |
197 if client.server_spec().capabilities.contains("setprocname") { | 180 if client.server_spec().capabilities.contains("setprocname") { |
198 let fut = client | 181 client |
199 .set_process_name(format!("chg[worker/{}]", loc.process_id)) | 182 .set_process_name(format!("chg[worker/{}]", self.process_id)) |
200 .map(|client| (loc, client)); | 183 .await?; |
201 Either::A(fut) | 184 } |
202 } else { | 185 client.set_current_dir(&self.current_dir).await?; |
203 Either::B(future::ok((loc, client))) | |
204 } | |
205 }) | |
206 .and_then(|(loc, client)| { | |
207 client | 186 client |
208 .set_current_dir(&loc.current_dir) | 187 .set_env_vars_os(self.env_vars.iter().cloned()) |
209 .map(|client| (loc, client)) | 188 .await?; |
210 }) | 189 Ok(client) |
211 .and_then(|(loc, client)| { | 190 } |
212 client | 191 } |
213 .set_env_vars_os(loc.env_vars.iter().cloned()) | |
214 .map(|client| (loc, client)) | |
215 }) | |
216 } | 192 } |
217 | 193 |
218 /// Spawns new server process and connects to it. | 194 /// Spawns new server process and connects to it. |
219 /// | 195 /// |
220 /// The server will be spawned at the current working directory, then | 196 /// The server will be spawned at the current working directory, then |
221 /// chdir to "/", so that the server will load configs from the target | 197 /// chdir to "/", so that the server will load configs from the target |
222 /// repository. | 198 /// repository. |
223 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { | 199 async fn spawn_connect(&mut self) -> io::Result<ChgClient> { |
224 let sock_path = self.temp_sock_path(); | 200 let sock_path = self.temp_sock_path(); |
225 debug!("start cmdserver at {}", sock_path.display()); | 201 debug!("start cmdserver at {}", sock_path.display()); |
226 Command::new(&self.hg_command) | 202 let server = Command::new(&self.hg_command) |
227 .arg("serve") | 203 .arg("serve") |
228 .arg("--cmdserver") | 204 .arg("--cmdserver") |
229 .arg("chgunix") | 205 .arg("chgunix") |
230 .arg("--address") | 206 .arg("--address") |
231 .arg(&sock_path) | 207 .arg(&sock_path) |
234 .args(&self.hg_early_args) | 210 .args(&self.hg_early_args) |
235 .current_dir(&self.current_dir) | 211 .current_dir(&self.current_dir) |
236 .env_clear() | 212 .env_clear() |
237 .envs(self.env_vars.iter().cloned()) | 213 .envs(self.env_vars.iter().cloned()) |
238 .env("CHGINTERNALMARK", "") | 214 .env("CHGINTERNALMARK", "") |
239 .spawn() | 215 .spawn()?; |
240 .into_future() | 216 let client = self.connect_spawned(server, &sock_path).await?; |
241 .and_then(|server| self.connect_spawned(server, sock_path)) | 217 // TODO: unindent |
242 .and_then(|(loc, client, sock_path)| { | 218 { |
219 { | |
243 debug!( | 220 debug!( |
244 "rename {} to {}", | 221 "rename {} to {}", |
245 sock_path.display(), | 222 sock_path.display(), |
246 loc.base_sock_path.display() | 223 self.base_sock_path.display() |
247 ); | 224 ); |
248 fs::rename(&sock_path, &loc.base_sock_path)?; | 225 fs::rename(&sock_path, &self.base_sock_path)?; |
249 Ok((loc, client)) | 226 Ok(client) |
250 }) | 227 } |
228 } | |
251 } | 229 } |
252 | 230 |
253 /// Tries to connect to the just spawned server repeatedly until timeout | 231 /// Tries to connect to the just spawned server repeatedly until timeout |
254 /// exceeded. | 232 /// exceeded. |
255 fn connect_spawned( | 233 async fn connect_spawned( |
256 self, | 234 &mut self, |
257 server: Child, | 235 mut server: Child, |
258 sock_path: PathBuf, | 236 sock_path: &Path, |
259 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> { | 237 ) -> io::Result<ChgClient> { |
260 debug!("try connect to {} repeatedly", sock_path.display()); | 238 debug!("try connect to {} repeatedly", sock_path.display()); |
261 let connect = future::loop_fn(sock_path, |sock_path| { | |
262 UnixClient::connect(sock_path.clone()).then(|res| { | |
263 match res { | |
264 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))), | |
265 Err(_) => { | |
266 // try again with slight delay | |
267 let fut = time::delay_for(Duration::from_millis(10)) | |
268 .map(|()| Loop::Continue(sock_path)) | |
269 .map_err(|err| io::Error::new(io::ErrorKind::Other, err)); | |
270 Either::B(fut) | |
271 } | |
272 } | |
273 }) | |
274 }); | |
275 | |
276 // waits for either connection established or server failed to start | 239 // waits for either connection established or server failed to start |
277 connect | 240 let start_time = Instant::now(); |
278 .select2(server) | 241 while start_time.elapsed() < self.timeout { |
279 .map_err(|res| res.split().0) | 242 if let Ok(client) = ChgClient::connect(&sock_path).await { |
280 .timeout(self.timeout) | 243 // server handle is dropped here, but the detached process |
281 .map_err(|err| { | 244 // will continue running in background |
282 err.into_inner().unwrap_or_else(|| { | 245 return Ok(client); |
283 io::Error::new( | 246 } |
284 io::ErrorKind::TimedOut, | 247 |
285 "timed out while connecting to server", | 248 if let Some(st) = server.try_wait()? { |
286 ) | 249 return Err(io::Error::new( |
287 }) | 250 io::ErrorKind::Other, |
288 }) | 251 format!("server exited too early: {}", st), |
289 .and_then(|res| { | 252 )); |
290 match res { | 253 } |
291 Either::A(((client, sock_path), server)) => { | 254 |
292 server.forget(); // continue to run in background | 255 // try again with slight delay |
293 Ok((self, client, sock_path)) | 256 time::delay_for(Duration::from_millis(10)).await; |
294 } | 257 } |
295 Either::B((st, _)) => Err(io::Error::new( | 258 |
296 io::ErrorKind::Other, | 259 Err(io::Error::new( |
297 format!("server exited too early: {}", st), | 260 io::ErrorKind::TimedOut, |
298 )), | 261 "timed out while connecting to server", |
299 } | 262 )) |
300 }) | |
301 } | 263 } |
302 } | 264 } |
303 | 265 |
304 /// Determines the server socket to connect to. | 266 /// Determines the server socket to connect to. |
305 /// | 267 /// |