rust/hg-cpython/src/copy_tracing.rs
changeset 46589 620c88fb42a2
parent 46588 47557ea79fc7
child 46590 8d20abed6a1e
equal deleted inserted replaced
46588:47557ea79fc7 46589:620c88fb42a2
     9 use cpython::Python;
     9 use cpython::Python;
    10 
    10 
    11 use hg::copy_tracing::ChangedFiles;
    11 use hg::copy_tracing::ChangedFiles;
    12 use hg::copy_tracing::CombineChangesetCopies;
    12 use hg::copy_tracing::CombineChangesetCopies;
    13 use hg::Revision;
    13 use hg::Revision;
       
    14 
       
    15 use self::pybytes_with_data::PyBytesWithData;
       
    16 
       
    17 // Module to encapsulate private fields
       
    18 mod pybytes_with_data {
       
    19     use cpython::{PyBytes, Python};
       
    20 
       
    21     /// Safe abstraction over a `PyBytes` together with the `&[u8]` slice
       
    22     /// that borrows it.
       
    23     ///
       
    24     /// Calling `PyBytes::data` requires a GIL marker but we want to access the
       
    25     /// data in a thread that (ideally) does not need to acquire the GIL.
       
    26     /// This type allows separating the call an the use.
       
    27     pub(super) struct PyBytesWithData {
       
    28         #[allow(unused)]
       
    29         keep_alive: PyBytes,
       
    30 
       
    31         /// Borrows the buffer inside `self.keep_alive`,
       
    32         /// but the borrow-checker cannot express self-referential structs.
       
    33         data: *const [u8],
       
    34     }
       
    35 
       
    36     fn require_send<T: Send>() {}
       
    37 
       
    38     #[allow(unused)]
       
    39     fn static_assert_pybytes_is_send() {
       
    40         require_send::<PyBytes>;
       
    41     }
       
    42 
       
    43     // Safety: PyBytes is Send. Raw pointers are not by default,
       
    44     // but here sending one to another thread is fine since we ensure it stays
       
    45     // valid.
       
    46     unsafe impl Send for PyBytesWithData {}
       
    47 
       
    48     impl PyBytesWithData {
       
    49         pub fn new(py: Python, bytes: PyBytes) -> Self {
       
    50             Self {
       
    51                 data: bytes.data(py),
       
    52                 keep_alive: bytes,
       
    53             }
       
    54         }
       
    55 
       
    56         pub fn data(&self) -> &[u8] {
       
    57             // Safety: the raw pointer is valid as long as the PyBytes is still
       
    58             // alive, and the returned slice borrows `self`.
       
    59             unsafe { &*self.data }
       
    60         }
       
    61     }
       
    62 }
    14 
    63 
    15 /// Combines copies information contained into revision `revs` to build a copy
    64 /// Combines copies information contained into revision `revs` to build a copy
    16 /// map.
    65 /// map.
    17 ///
    66 ///
    18 /// See mercurial/copies.py for details
    67 /// See mercurial/copies.py for details
    29         .iter()
    78         .iter()
    30         .map(|(k, v)| Ok((k.extract(py)?, v.extract(py)?)))
    79         .map(|(k, v)| Ok((k.extract(py)?, v.extract(py)?)))
    31         .collect::<PyResult<_>>()?;
    80         .collect::<PyResult<_>>()?;
    32 
    81 
    33     /// (Revision number, parent 1, parent 2, copy data for this revision)
    82     /// (Revision number, parent 1, parent 2, copy data for this revision)
    34     type RevInfo = (Revision, Revision, Revision, Option<PyBytes>);
    83     type RevInfo<Bytes> = (Revision, Revision, Revision, Option<Bytes>);
    35 
    84 
    36     let revs_info = revs.iter(py).map(|rev_py| -> PyResult<RevInfo> {
    85     let revs_info =
    37         let rev = rev_py.extract(py)?;
    86         revs.iter(py).map(|rev_py| -> PyResult<RevInfo<PyBytes>> {
    38         let tuple: PyTuple =
    87             let rev = rev_py.extract(py)?;
    39             rev_info.call(py, (rev_py,), None)?.cast_into(py)?;
    88             let tuple: PyTuple =
    40         let p1 = tuple.get_item(py, 0).extract(py)?;
    89                 rev_info.call(py, (rev_py,), None)?.cast_into(py)?;
    41         let p2 = tuple.get_item(py, 1).extract(py)?;
    90             let p1 = tuple.get_item(py, 0).extract(py)?;
    42         let opt_bytes = tuple.get_item(py, 2).extract(py)?;
    91             let p2 = tuple.get_item(py, 1).extract(py)?;
    43         Ok((rev, p1, p2, opt_bytes))
    92             let opt_bytes = tuple.get_item(py, 2).extract(py)?;
    44     });
    93             Ok((rev, p1, p2, opt_bytes))
       
    94         });
    45 
    95 
    46     let path_copies = if !multi_thread {
    96     let path_copies = if !multi_thread {
    47         let mut combine_changeset_copies =
    97         let mut combine_changeset_copies =
    48             CombineChangesetCopies::new(children_count);
    98             CombineChangesetCopies::new(children_count);
    49 
    99 
    65         // is to gather data for them, an unbounded channel would keep
   115         // is to gather data for them, an unbounded channel would keep
    66         // growing and eat memory.
   116         // growing and eat memory.
    67         //
   117         //
    68         // TODO: tweak the bound?
   118         // TODO: tweak the bound?
    69         let (rev_info_sender, rev_info_receiver) =
   119         let (rev_info_sender, rev_info_receiver) =
    70             crossbeam_channel::bounded::<RevInfo>(1000);
   120             crossbeam_channel::bounded::<RevInfo<PyBytesWithData>>(1000);
    71 
   121 
    72         // Start a thread that does CPU-heavy processing in parallel with the
   122         // Start a thread that does CPU-heavy processing in parallel with the
    73         // loop below.
   123         // loop below.
    74         //
   124         //
    75         // If the parent thread panics, `rev_info_sender` will be dropped and
   125         // If the parent thread panics, `rev_info_sender` will be dropped and
    77         // exit its own loop.
   127         // exit its own loop.
    78         let thread = std::thread::spawn(move || {
   128         let thread = std::thread::spawn(move || {
    79             let mut combine_changeset_copies =
   129             let mut combine_changeset_copies =
    80                 CombineChangesetCopies::new(children_count);
   130                 CombineChangesetCopies::new(children_count);
    81             for (rev, p1, p2, opt_bytes) in rev_info_receiver {
   131             for (rev, p1, p2, opt_bytes) in rev_info_receiver {
    82                 let gil = Python::acquire_gil();
       
    83                 let py = gil.python();
       
    84                 let files = match &opt_bytes {
   132                 let files = match &opt_bytes {
    85                     Some(raw) => ChangedFiles::new(raw.data(py)),
   133                     Some(raw) => ChangedFiles::new(raw.data()),
    86                     // Python None was extracted to Option::None,
   134                     // Python None was extracted to Option::None,
    87                     // meaning there was no copy data.
   135                     // meaning there was no copy data.
    88                     None => ChangedFiles::new_empty(),
   136                     None => ChangedFiles::new_empty(),
    89                 };
   137                 };
    90                 combine_changeset_copies.add_revision(rev, p1, p2, files)
   138                 combine_changeset_copies.add_revision(rev, p1, p2, files)
       
   139 
       
   140                 // The GIL is (still) implicitly acquired here through
       
   141                 // `impl Drop for PyBytes`.
    91             }
   142             }
    92 
   143 
    93             combine_changeset_copies.finish(target_rev)
   144             combine_changeset_copies.finish(target_rev)
    94         });
   145         });
    95 
   146 
    96         for rev_info in revs_info {
   147         for rev_info in revs_info {
    97             let (rev, p1, p2, opt_bytes) = rev_info?;
   148             let (rev, p1, p2, opt_bytes) = rev_info?;
       
   149             let opt_bytes = opt_bytes.map(|b| PyBytesWithData::new(py, b));
    98 
   150 
    99             // We’d prefer to avoid the child thread calling into Python code,
   151             // We’d prefer to avoid the child thread calling into Python code,
   100             // but this avoids a potential deadlock on the GIL if it does:
   152             // but this avoids a potential deadlock on the GIL if it does:
   101             py.allow_threads(|| {
   153             py.allow_threads(|| {
   102                 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
   154                 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(