Mercurial > hg
comparison rust/chg/src/locator.rs @ 44671:bb936e25a84a
rust-chg: spawn server process if not running
This is the minimal reimplementation of gethgcmd(), execcmdserver(),
retryconnectcmdserver(), and connectcmdserver() in chg.c.
No config validation is implemented yet. And some Py3 workarounds would
be missing as this is the code I wrote in 2018.
Differential Revision: https://phab.mercurial-scm.org/D8360
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Sat, 06 Oct 2018 20:10:44 +0900 |
parents | c11d98cff883 |
children | 7bf45ed9e25e |
comparison
equal
deleted
inserted
replaced
44670:cf144f50f413 | 44671:bb936e25a84a |
---|---|
3 // This software may be used and distributed according to the terms of the | 3 // This software may be used and distributed according to the terms of the |
4 // GNU General Public License version 2 or any later version. | 4 // GNU General Public License version 2 or any later version. |
5 | 5 |
6 //! Utility for locating command-server process. | 6 //! Utility for locating command-server process. |
7 | 7 |
8 use futures::future::{self, Either, Loop}; | |
8 use std::env; | 9 use std::env; |
9 use std::ffi::{OsStr, OsString}; | 10 use std::ffi::{OsStr, OsString}; |
10 use std::fs::{self, DirBuilder}; | 11 use std::fs::{self, DirBuilder}; |
11 use std::io; | 12 use std::io; |
12 use std::os::unix::ffi::{OsStrExt, OsStringExt}; | 13 use std::os::unix::ffi::{OsStrExt, OsStringExt}; |
13 use std::os::unix::fs::{DirBuilderExt, MetadataExt}; | 14 use std::os::unix::fs::{DirBuilderExt, MetadataExt}; |
14 use std::path::{Path, PathBuf}; | 15 use std::path::{Path, PathBuf}; |
15 use std::process; | 16 use std::process::{self, Command}; |
16 use std::time::Duration; | 17 use std::time::Duration; |
18 use tokio::prelude::*; | |
19 use tokio_hglib::UnixClient; | |
20 use tokio_process::{Child, CommandExt}; | |
21 use tokio_timer; | |
17 | 22 |
18 use super::procutil; | 23 use super::procutil; |
19 | 24 |
20 /// Helper to connect to and spawn a server process. | 25 /// Helper to connect to and spawn a server process. |
21 #[derive(Clone, Debug)] | 26 #[derive(Clone, Debug)] |
50 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len() | 55 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len() |
51 buf.extend_from_slice(src); | 56 buf.extend_from_slice(src); |
52 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes()); | 57 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes()); |
53 OsString::from_vec(buf).into() | 58 OsString::from_vec(buf).into() |
54 } | 59 } |
60 | |
61 /// Connects to the server. | |
62 /// | |
63 /// The server process will be spawned if not running. | |
64 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { | |
65 self.try_connect() | |
66 } | |
67 | |
68 /// Tries to connect to the existing server, or spawns new if not running. | |
69 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { | |
70 debug!("try connect to {}", self.base_sock_path.display()); | |
71 UnixClient::connect(self.base_sock_path.clone()).then(|res| match res { | |
72 Ok(client) => Either::A(future::ok((self, client))), | |
73 Err(_) => Either::B(self.spawn_connect()), | |
74 }) | |
75 } | |
76 | |
77 /// Spawns new server process and connects to it. | |
78 /// | |
79 /// The server will be spawned at the current working directory, then | |
80 /// chdir to "/", so that the server will load configs from the target | |
81 /// repository. | |
82 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { | |
83 let sock_path = self.temp_sock_path(); | |
84 debug!("start cmdserver at {}", sock_path.display()); | |
85 Command::new(&self.hg_command) | |
86 .arg("serve") | |
87 .arg("--cmdserver") | |
88 .arg("chgunix") | |
89 .arg("--address") | |
90 .arg(&sock_path) | |
91 .arg("--daemon-postexec") | |
92 .arg("chdir:/") | |
93 .current_dir(&self.current_dir) | |
94 .env_clear() | |
95 .envs(self.env_vars.iter().cloned()) | |
96 .env("CHGINTERNALMARK", "") | |
97 .spawn_async() | |
98 .into_future() | |
99 .and_then(|server| self.connect_spawned(server, sock_path)) | |
100 .and_then(|(loc, client, sock_path)| { | |
101 debug!( | |
102 "rename {} to {}", | |
103 sock_path.display(), | |
104 loc.base_sock_path.display() | |
105 ); | |
106 fs::rename(&sock_path, &loc.base_sock_path)?; | |
107 Ok((loc, client)) | |
108 }) | |
109 } | |
110 | |
111 /// Tries to connect to the just spawned server repeatedly until timeout | |
112 /// exceeded. | |
113 fn connect_spawned( | |
114 self, | |
115 server: Child, | |
116 sock_path: PathBuf, | |
117 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> { | |
118 debug!("try connect to {} repeatedly", sock_path.display()); | |
119 let connect = future::loop_fn(sock_path, |sock_path| { | |
120 UnixClient::connect(sock_path.clone()).then(|res| { | |
121 match res { | |
122 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))), | |
123 Err(_) => { | |
124 // try again with slight delay | |
125 let fut = tokio_timer::sleep(Duration::from_millis(10)) | |
126 .map(|()| Loop::Continue(sock_path)) | |
127 .map_err(|err| io::Error::new(io::ErrorKind::Other, err)); | |
128 Either::B(fut) | |
129 } | |
130 } | |
131 }) | |
132 }); | |
133 | |
134 // waits for either connection established or server failed to start | |
135 connect | |
136 .select2(server) | |
137 .map_err(|res| res.split().0) | |
138 .timeout(self.timeout) | |
139 .map_err(|err| { | |
140 err.into_inner().unwrap_or_else(|| { | |
141 io::Error::new( | |
142 io::ErrorKind::TimedOut, | |
143 "timed out while connecting to server", | |
144 ) | |
145 }) | |
146 }) | |
147 .and_then(|res| { | |
148 match res { | |
149 Either::A(((client, sock_path), server)) => { | |
150 server.forget(); // continue to run in background | |
151 Ok((self, client, sock_path)) | |
152 } | |
153 Either::B((st, _)) => Err(io::Error::new( | |
154 io::ErrorKind::Other, | |
155 format!("server exited too early: {}", st), | |
156 )), | |
157 } | |
158 }) | |
159 } | |
55 } | 160 } |
56 | 161 |
57 /// Determines the server socket to connect to. | 162 /// Determines the server socket to connect to. |
58 /// | 163 /// |
59 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created | 164 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created |
60 /// as necessary. | 165 /// as necessary. |
61 pub fn prepare_server_socket_path() -> io::Result<PathBuf> { | 166 fn prepare_server_socket_path() -> io::Result<PathBuf> { |
62 if let Some(s) = env::var_os("CHGSOCKNAME") { | 167 if let Some(s) = env::var_os("CHGSOCKNAME") { |
63 Ok(PathBuf::from(s)) | 168 Ok(PathBuf::from(s)) |
64 } else { | 169 } else { |
65 let mut path = default_server_socket_dir(); | 170 let mut path = default_server_socket_dir(); |
66 create_secure_dir(&path)?; | 171 create_secure_dir(&path)?; |