rust/chg/src/locator.rs
changeset 44671 bb936e25a84a
parent 44668 c11d98cff883
child 44672 7bf45ed9e25e
equal deleted inserted replaced
44670:cf144f50f413 44671:bb936e25a84a
     3 // This software may be used and distributed according to the terms of the
     3 // This software may be used and distributed according to the terms of the
     4 // GNU General Public License version 2 or any later version.
     4 // GNU General Public License version 2 or any later version.
     5 
     5 
     6 //! Utility for locating command-server process.
     6 //! Utility for locating command-server process.
     7 
     7 
       
     8 use futures::future::{self, Either, Loop};
     8 use std::env;
     9 use std::env;
     9 use std::ffi::{OsStr, OsString};
    10 use std::ffi::{OsStr, OsString};
    10 use std::fs::{self, DirBuilder};
    11 use std::fs::{self, DirBuilder};
    11 use std::io;
    12 use std::io;
    12 use std::os::unix::ffi::{OsStrExt, OsStringExt};
    13 use std::os::unix::ffi::{OsStrExt, OsStringExt};
    13 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
    14 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
    14 use std::path::{Path, PathBuf};
    15 use std::path::{Path, PathBuf};
    15 use std::process;
    16 use std::process::{self, Command};
    16 use std::time::Duration;
    17 use std::time::Duration;
       
    18 use tokio::prelude::*;
       
    19 use tokio_hglib::UnixClient;
       
    20 use tokio_process::{Child, CommandExt};
       
    21 use tokio_timer;
    17 
    22 
    18 use super::procutil;
    23 use super::procutil;
    19 
    24 
    20 /// Helper to connect to and spawn a server process.
    25 /// Helper to connect to and spawn a server process.
    21 #[derive(Clone, Debug)]
    26 #[derive(Clone, Debug)]
    50         let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
    55         let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
    51         buf.extend_from_slice(src);
    56         buf.extend_from_slice(src);
    52         buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
    57         buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
    53         OsString::from_vec(buf).into()
    58         OsString::from_vec(buf).into()
    54     }
    59     }
       
    60 
       
    61     /// Connects to the server.
       
    62     ///
       
    63     /// The server process will be spawned if not running.
       
    64     pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
       
    65         self.try_connect()
       
    66     }
       
    67 
       
    68     /// Tries to connect to the existing server, or spawns new if not running.
       
    69     fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
       
    70         debug!("try connect to {}", self.base_sock_path.display());
       
    71         UnixClient::connect(self.base_sock_path.clone()).then(|res| match res {
       
    72             Ok(client) => Either::A(future::ok((self, client))),
       
    73             Err(_) => Either::B(self.spawn_connect()),
       
    74         })
       
    75     }
       
    76 
       
    77     /// Spawns new server process and connects to it.
       
    78     ///
       
    79     /// The server will be spawned at the current working directory, then
       
    80     /// chdir to "/", so that the server will load configs from the target
       
    81     /// repository.
       
    82     fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
       
    83         let sock_path = self.temp_sock_path();
       
    84         debug!("start cmdserver at {}", sock_path.display());
       
    85         Command::new(&self.hg_command)
       
    86             .arg("serve")
       
    87             .arg("--cmdserver")
       
    88             .arg("chgunix")
       
    89             .arg("--address")
       
    90             .arg(&sock_path)
       
    91             .arg("--daemon-postexec")
       
    92             .arg("chdir:/")
       
    93             .current_dir(&self.current_dir)
       
    94             .env_clear()
       
    95             .envs(self.env_vars.iter().cloned())
       
    96             .env("CHGINTERNALMARK", "")
       
    97             .spawn_async()
       
    98             .into_future()
       
    99             .and_then(|server| self.connect_spawned(server, sock_path))
       
   100             .and_then(|(loc, client, sock_path)| {
       
   101                 debug!(
       
   102                     "rename {} to {}",
       
   103                     sock_path.display(),
       
   104                     loc.base_sock_path.display()
       
   105                 );
       
   106                 fs::rename(&sock_path, &loc.base_sock_path)?;
       
   107                 Ok((loc, client))
       
   108             })
       
   109     }
       
   110 
       
   111     /// Tries to connect to the just spawned server repeatedly until timeout
       
   112     /// exceeded.
       
   113     fn connect_spawned(
       
   114         self,
       
   115         server: Child,
       
   116         sock_path: PathBuf,
       
   117     ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
       
   118         debug!("try connect to {} repeatedly", sock_path.display());
       
   119         let connect = future::loop_fn(sock_path, |sock_path| {
       
   120             UnixClient::connect(sock_path.clone()).then(|res| {
       
   121                 match res {
       
   122                     Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
       
   123                     Err(_) => {
       
   124                         // try again with slight delay
       
   125                         let fut = tokio_timer::sleep(Duration::from_millis(10))
       
   126                             .map(|()| Loop::Continue(sock_path))
       
   127                             .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
       
   128                         Either::B(fut)
       
   129                     }
       
   130                 }
       
   131             })
       
   132         });
       
   133 
       
   134         // waits for either connection established or server failed to start
       
   135         connect
       
   136             .select2(server)
       
   137             .map_err(|res| res.split().0)
       
   138             .timeout(self.timeout)
       
   139             .map_err(|err| {
       
   140                 err.into_inner().unwrap_or_else(|| {
       
   141                     io::Error::new(
       
   142                         io::ErrorKind::TimedOut,
       
   143                         "timed out while connecting to server",
       
   144                     )
       
   145                 })
       
   146             })
       
   147             .and_then(|res| {
       
   148                 match res {
       
   149                     Either::A(((client, sock_path), server)) => {
       
   150                         server.forget(); // continue to run in background
       
   151                         Ok((self, client, sock_path))
       
   152                     }
       
   153                     Either::B((st, _)) => Err(io::Error::new(
       
   154                         io::ErrorKind::Other,
       
   155                         format!("server exited too early: {}", st),
       
   156                     )),
       
   157                 }
       
   158             })
       
   159     }
    55 }
   160 }
    56 
   161 
    57 /// Determines the server socket to connect to.
   162 /// Determines the server socket to connect to.
    58 ///
   163 ///
    59 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
   164 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
    60 /// as necessary.
   165 /// as necessary.
    61 pub fn prepare_server_socket_path() -> io::Result<PathBuf> {
   166 fn prepare_server_socket_path() -> io::Result<PathBuf> {
    62     if let Some(s) = env::var_os("CHGSOCKNAME") {
   167     if let Some(s) = env::var_os("CHGSOCKNAME") {
    63         Ok(PathBuf::from(s))
   168         Ok(PathBuf::from(s))
    64     } else {
   169     } else {
    65         let mut path = default_server_socket_dir();
   170         let mut path = default_server_socket_dir();
    66         create_secure_dir(&path)?;
   171         create_secure_dir(&path)?;