3 // This software may be used and distributed according to the terms of the |
3 // This software may be used and distributed according to the terms of the |
4 // GNU General Public License version 2 or any later version. |
4 // GNU General Public License version 2 or any later version. |
5 |
5 |
6 //! Utility for locating command-server process. |
6 //! Utility for locating command-server process. |
7 |
7 |
|
8 use futures::future::{self, Either, Loop}; |
8 use std::env; |
9 use std::env; |
9 use std::ffi::{OsStr, OsString}; |
10 use std::ffi::{OsStr, OsString}; |
10 use std::fs::{self, DirBuilder}; |
11 use std::fs::{self, DirBuilder}; |
11 use std::io; |
12 use std::io; |
12 use std::os::unix::ffi::{OsStrExt, OsStringExt}; |
13 use std::os::unix::ffi::{OsStrExt, OsStringExt}; |
13 use std::os::unix::fs::{DirBuilderExt, MetadataExt}; |
14 use std::os::unix::fs::{DirBuilderExt, MetadataExt}; |
14 use std::path::{Path, PathBuf}; |
15 use std::path::{Path, PathBuf}; |
15 use std::process; |
16 use std::process::{self, Command}; |
16 use std::time::Duration; |
17 use std::time::Duration; |
|
18 use tokio::prelude::*; |
|
19 use tokio_hglib::UnixClient; |
|
20 use tokio_process::{Child, CommandExt}; |
|
21 use tokio_timer; |
17 |
22 |
18 use super::procutil; |
23 use super::procutil; |
19 |
24 |
20 /// Helper to connect to and spawn a server process. |
25 /// Helper to connect to and spawn a server process. |
21 #[derive(Clone, Debug)] |
26 #[derive(Clone, Debug)] |
50 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len() |
55 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len() |
51 buf.extend_from_slice(src); |
56 buf.extend_from_slice(src); |
52 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes()); |
57 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes()); |
53 OsString::from_vec(buf).into() |
58 OsString::from_vec(buf).into() |
54 } |
59 } |
|
60 |
|
61 /// Connects to the server. |
|
62 /// |
|
63 /// The server process will be spawned if not running. |
|
64 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { |
|
65 self.try_connect() |
|
66 } |
|
67 |
|
68 /// Tries to connect to the existing server, or spawns new if not running. |
|
69 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { |
|
70 debug!("try connect to {}", self.base_sock_path.display()); |
|
71 UnixClient::connect(self.base_sock_path.clone()).then(|res| match res { |
|
72 Ok(client) => Either::A(future::ok((self, client))), |
|
73 Err(_) => Either::B(self.spawn_connect()), |
|
74 }) |
|
75 } |
|
76 |
|
77 /// Spawns new server process and connects to it. |
|
78 /// |
|
79 /// The server will be spawned at the current working directory, then |
|
80 /// chdir to "/", so that the server will load configs from the target |
|
81 /// repository. |
|
82 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { |
|
83 let sock_path = self.temp_sock_path(); |
|
84 debug!("start cmdserver at {}", sock_path.display()); |
|
85 Command::new(&self.hg_command) |
|
86 .arg("serve") |
|
87 .arg("--cmdserver") |
|
88 .arg("chgunix") |
|
89 .arg("--address") |
|
90 .arg(&sock_path) |
|
91 .arg("--daemon-postexec") |
|
92 .arg("chdir:/") |
|
93 .current_dir(&self.current_dir) |
|
94 .env_clear() |
|
95 .envs(self.env_vars.iter().cloned()) |
|
96 .env("CHGINTERNALMARK", "") |
|
97 .spawn_async() |
|
98 .into_future() |
|
99 .and_then(|server| self.connect_spawned(server, sock_path)) |
|
100 .and_then(|(loc, client, sock_path)| { |
|
101 debug!( |
|
102 "rename {} to {}", |
|
103 sock_path.display(), |
|
104 loc.base_sock_path.display() |
|
105 ); |
|
106 fs::rename(&sock_path, &loc.base_sock_path)?; |
|
107 Ok((loc, client)) |
|
108 }) |
|
109 } |
|
110 |
|
111 /// Tries to connect to the just spawned server repeatedly until timeout |
|
112 /// exceeded. |
|
113 fn connect_spawned( |
|
114 self, |
|
115 server: Child, |
|
116 sock_path: PathBuf, |
|
117 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> { |
|
118 debug!("try connect to {} repeatedly", sock_path.display()); |
|
119 let connect = future::loop_fn(sock_path, |sock_path| { |
|
120 UnixClient::connect(sock_path.clone()).then(|res| { |
|
121 match res { |
|
122 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))), |
|
123 Err(_) => { |
|
124 // try again with slight delay |
|
125 let fut = tokio_timer::sleep(Duration::from_millis(10)) |
|
126 .map(|()| Loop::Continue(sock_path)) |
|
127 .map_err(|err| io::Error::new(io::ErrorKind::Other, err)); |
|
128 Either::B(fut) |
|
129 } |
|
130 } |
|
131 }) |
|
132 }); |
|
133 |
|
134 // waits for either connection established or server failed to start |
|
135 connect |
|
136 .select2(server) |
|
137 .map_err(|res| res.split().0) |
|
138 .timeout(self.timeout) |
|
139 .map_err(|err| { |
|
140 err.into_inner().unwrap_or_else(|| { |
|
141 io::Error::new( |
|
142 io::ErrorKind::TimedOut, |
|
143 "timed out while connecting to server", |
|
144 ) |
|
145 }) |
|
146 }) |
|
147 .and_then(|res| { |
|
148 match res { |
|
149 Either::A(((client, sock_path), server)) => { |
|
150 server.forget(); // continue to run in background |
|
151 Ok((self, client, sock_path)) |
|
152 } |
|
153 Either::B((st, _)) => Err(io::Error::new( |
|
154 io::ErrorKind::Other, |
|
155 format!("server exited too early: {}", st), |
|
156 )), |
|
157 } |
|
158 }) |
|
159 } |
55 } |
160 } |
56 |
161 |
57 /// Determines the server socket to connect to. |
162 /// Determines the server socket to connect to. |
58 /// |
163 /// |
59 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created |
164 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created |
60 /// as necessary. |
165 /// as necessary. |
61 pub fn prepare_server_socket_path() -> io::Result<PathBuf> { |
166 fn prepare_server_socket_path() -> io::Result<PathBuf> { |
62 if let Some(s) = env::var_os("CHGSOCKNAME") { |
167 if let Some(s) = env::var_os("CHGSOCKNAME") { |
63 Ok(PathBuf::from(s)) |
168 Ok(PathBuf::from(s)) |
64 } else { |
169 } else { |
65 let mut path = default_server_socket_dir(); |
170 let mut path = default_server_socket_dir(); |
66 create_secure_dir(&path)?; |
171 create_secure_dir(&path)?; |