comparison rust/chg/src/locator.rs @ 44671:bb936e25a84a

rust-chg: spawn server process if not running This is the minimal reimplementation of gethgcmd(), execcmdserver(), retryconnectcmdserver(), and connectcmdserver() in chg.c. No config validation is implemented yet. And some Py3 workarounds would be missing as this is the code I wrote in 2018. Differential Revision: https://phab.mercurial-scm.org/D8360
author Yuya Nishihara <yuya@tcha.org>
date Sat, 06 Oct 2018 20:10:44 +0900
parents c11d98cff883
children 7bf45ed9e25e
comparison
equal deleted inserted replaced
44670:cf144f50f413 44671:bb936e25a84a
3 // This software may be used and distributed according to the terms of the 3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version. 4 // GNU General Public License version 2 or any later version.
5 5
6 //! Utility for locating command-server process. 6 //! Utility for locating command-server process.
7 7
8 use futures::future::{self, Either, Loop};
8 use std::env; 9 use std::env;
9 use std::ffi::{OsStr, OsString}; 10 use std::ffi::{OsStr, OsString};
10 use std::fs::{self, DirBuilder}; 11 use std::fs::{self, DirBuilder};
11 use std::io; 12 use std::io;
12 use std::os::unix::ffi::{OsStrExt, OsStringExt}; 13 use std::os::unix::ffi::{OsStrExt, OsStringExt};
13 use std::os::unix::fs::{DirBuilderExt, MetadataExt}; 14 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
14 use std::path::{Path, PathBuf}; 15 use std::path::{Path, PathBuf};
15 use std::process; 16 use std::process::{self, Command};
16 use std::time::Duration; 17 use std::time::Duration;
18 use tokio::prelude::*;
19 use tokio_hglib::UnixClient;
20 use tokio_process::{Child, CommandExt};
21 use tokio_timer;
17 22
18 use super::procutil; 23 use super::procutil;
19 24
20 /// Helper to connect to and spawn a server process. 25 /// Helper to connect to and spawn a server process.
21 #[derive(Clone, Debug)] 26 #[derive(Clone, Debug)]
50 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len() 55 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
51 buf.extend_from_slice(src); 56 buf.extend_from_slice(src);
52 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes()); 57 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
53 OsString::from_vec(buf).into() 58 OsString::from_vec(buf).into()
54 } 59 }
60
61 /// Connects to the server.
62 ///
63 /// The server process will be spawned if not running.
64 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
65 self.try_connect()
66 }
67
68 /// Tries to connect to the existing server, or spawns new if not running.
69 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
70 debug!("try connect to {}", self.base_sock_path.display());
71 UnixClient::connect(self.base_sock_path.clone()).then(|res| match res {
72 Ok(client) => Either::A(future::ok((self, client))),
73 Err(_) => Either::B(self.spawn_connect()),
74 })
75 }
76
77 /// Spawns new server process and connects to it.
78 ///
79 /// The server will be spawned at the current working directory, then
80 /// chdir to "/", so that the server will load configs from the target
81 /// repository.
82 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
83 let sock_path = self.temp_sock_path();
84 debug!("start cmdserver at {}", sock_path.display());
85 Command::new(&self.hg_command)
86 .arg("serve")
87 .arg("--cmdserver")
88 .arg("chgunix")
89 .arg("--address")
90 .arg(&sock_path)
91 .arg("--daemon-postexec")
92 .arg("chdir:/")
93 .current_dir(&self.current_dir)
94 .env_clear()
95 .envs(self.env_vars.iter().cloned())
96 .env("CHGINTERNALMARK", "")
97 .spawn_async()
98 .into_future()
99 .and_then(|server| self.connect_spawned(server, sock_path))
100 .and_then(|(loc, client, sock_path)| {
101 debug!(
102 "rename {} to {}",
103 sock_path.display(),
104 loc.base_sock_path.display()
105 );
106 fs::rename(&sock_path, &loc.base_sock_path)?;
107 Ok((loc, client))
108 })
109 }
110
111 /// Tries to connect to the just spawned server repeatedly until timeout
112 /// exceeded.
113 fn connect_spawned(
114 self,
115 server: Child,
116 sock_path: PathBuf,
117 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
118 debug!("try connect to {} repeatedly", sock_path.display());
119 let connect = future::loop_fn(sock_path, |sock_path| {
120 UnixClient::connect(sock_path.clone()).then(|res| {
121 match res {
122 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
123 Err(_) => {
124 // try again with slight delay
125 let fut = tokio_timer::sleep(Duration::from_millis(10))
126 .map(|()| Loop::Continue(sock_path))
127 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
128 Either::B(fut)
129 }
130 }
131 })
132 });
133
134 // waits for either connection established or server failed to start
135 connect
136 .select2(server)
137 .map_err(|res| res.split().0)
138 .timeout(self.timeout)
139 .map_err(|err| {
140 err.into_inner().unwrap_or_else(|| {
141 io::Error::new(
142 io::ErrorKind::TimedOut,
143 "timed out while connecting to server",
144 )
145 })
146 })
147 .and_then(|res| {
148 match res {
149 Either::A(((client, sock_path), server)) => {
150 server.forget(); // continue to run in background
151 Ok((self, client, sock_path))
152 }
153 Either::B((st, _)) => Err(io::Error::new(
154 io::ErrorKind::Other,
155 format!("server exited too early: {}", st),
156 )),
157 }
158 })
159 }
55 } 160 }
56 161
57 /// Determines the server socket to connect to. 162 /// Determines the server socket to connect to.
58 /// 163 ///
59 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created 164 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
60 /// as necessary. 165 /// as necessary.
61 pub fn prepare_server_socket_path() -> io::Result<PathBuf> { 166 fn prepare_server_socket_path() -> io::Result<PathBuf> {
62 if let Some(s) = env::var_os("CHGSOCKNAME") { 167 if let Some(s) = env::var_os("CHGSOCKNAME") {
63 Ok(PathBuf::from(s)) 168 Ok(PathBuf::from(s))
64 } else { 169 } else {
65 let mut path = default_server_socket_dir(); 170 let mut path = default_server_socket_dir();
66 create_secure_dir(&path)?; 171 create_secure_dir(&path)?;