9 use cpython::Python; |
9 use cpython::Python; |
10 |
10 |
11 use hg::copy_tracing::ChangedFiles; |
11 use hg::copy_tracing::ChangedFiles; |
12 use hg::copy_tracing::CombineChangesetCopies; |
12 use hg::copy_tracing::CombineChangesetCopies; |
13 use hg::Revision; |
13 use hg::Revision; |
|
14 |
|
15 use self::pybytes_with_data::PyBytesWithData; |
|
16 |
|
17 // Module to encapsulate private fields |
|
18 mod pybytes_with_data { |
|
19 use cpython::{PyBytes, Python}; |
|
20 |
|
21 /// Safe abstraction over a `PyBytes` together with the `&[u8]` slice |
|
22 /// that borrows it. |
|
23 /// |
|
24 /// Calling `PyBytes::data` requires a GIL marker but we want to access the |
|
25 /// data in a thread that (ideally) does not need to acquire the GIL. |
|
26 /// This type allows separating the call an the use. |
|
27 pub(super) struct PyBytesWithData { |
|
28 #[allow(unused)] |
|
29 keep_alive: PyBytes, |
|
30 |
|
31 /// Borrows the buffer inside `self.keep_alive`, |
|
32 /// but the borrow-checker cannot express self-referential structs. |
|
33 data: *const [u8], |
|
34 } |
|
35 |
|
36 fn require_send<T: Send>() {} |
|
37 |
|
38 #[allow(unused)] |
|
39 fn static_assert_pybytes_is_send() { |
|
40 require_send::<PyBytes>; |
|
41 } |
|
42 |
|
43 // Safety: PyBytes is Send. Raw pointers are not by default, |
|
44 // but here sending one to another thread is fine since we ensure it stays |
|
45 // valid. |
|
46 unsafe impl Send for PyBytesWithData {} |
|
47 |
|
48 impl PyBytesWithData { |
|
49 pub fn new(py: Python, bytes: PyBytes) -> Self { |
|
50 Self { |
|
51 data: bytes.data(py), |
|
52 keep_alive: bytes, |
|
53 } |
|
54 } |
|
55 |
|
56 pub fn data(&self) -> &[u8] { |
|
57 // Safety: the raw pointer is valid as long as the PyBytes is still |
|
58 // alive, and the returned slice borrows `self`. |
|
59 unsafe { &*self.data } |
|
60 } |
|
61 } |
|
62 } |
14 |
63 |
15 /// Combines copies information contained into revision `revs` to build a copy |
64 /// Combines copies information contained into revision `revs` to build a copy |
16 /// map. |
65 /// map. |
17 /// |
66 /// |
18 /// See mercurial/copies.py for details |
67 /// See mercurial/copies.py for details |
29 .iter() |
78 .iter() |
30 .map(|(k, v)| Ok((k.extract(py)?, v.extract(py)?))) |
79 .map(|(k, v)| Ok((k.extract(py)?, v.extract(py)?))) |
31 .collect::<PyResult<_>>()?; |
80 .collect::<PyResult<_>>()?; |
32 |
81 |
33 /// (Revision number, parent 1, parent 2, copy data for this revision) |
82 /// (Revision number, parent 1, parent 2, copy data for this revision) |
34 type RevInfo = (Revision, Revision, Revision, Option<PyBytes>); |
83 type RevInfo<Bytes> = (Revision, Revision, Revision, Option<Bytes>); |
35 |
84 |
36 let revs_info = revs.iter(py).map(|rev_py| -> PyResult<RevInfo> { |
85 let revs_info = |
37 let rev = rev_py.extract(py)?; |
86 revs.iter(py).map(|rev_py| -> PyResult<RevInfo<PyBytes>> { |
38 let tuple: PyTuple = |
87 let rev = rev_py.extract(py)?; |
39 rev_info.call(py, (rev_py,), None)?.cast_into(py)?; |
88 let tuple: PyTuple = |
40 let p1 = tuple.get_item(py, 0).extract(py)?; |
89 rev_info.call(py, (rev_py,), None)?.cast_into(py)?; |
41 let p2 = tuple.get_item(py, 1).extract(py)?; |
90 let p1 = tuple.get_item(py, 0).extract(py)?; |
42 let opt_bytes = tuple.get_item(py, 2).extract(py)?; |
91 let p2 = tuple.get_item(py, 1).extract(py)?; |
43 Ok((rev, p1, p2, opt_bytes)) |
92 let opt_bytes = tuple.get_item(py, 2).extract(py)?; |
44 }); |
93 Ok((rev, p1, p2, opt_bytes)) |
|
94 }); |
45 |
95 |
46 let path_copies = if !multi_thread { |
96 let path_copies = if !multi_thread { |
47 let mut combine_changeset_copies = |
97 let mut combine_changeset_copies = |
48 CombineChangesetCopies::new(children_count); |
98 CombineChangesetCopies::new(children_count); |
49 |
99 |
65 // is to gather data for them, an unbounded channel would keep |
115 // is to gather data for them, an unbounded channel would keep |
66 // growing and eat memory. |
116 // growing and eat memory. |
67 // |
117 // |
68 // TODO: tweak the bound? |
118 // TODO: tweak the bound? |
69 let (rev_info_sender, rev_info_receiver) = |
119 let (rev_info_sender, rev_info_receiver) = |
70 crossbeam_channel::bounded::<RevInfo>(1000); |
120 crossbeam_channel::bounded::<RevInfo<PyBytesWithData>>(1000); |
71 |
121 |
72 // Start a thread that does CPU-heavy processing in parallel with the |
122 // Start a thread that does CPU-heavy processing in parallel with the |
73 // loop below. |
123 // loop below. |
74 // |
124 // |
75 // If the parent thread panics, `rev_info_sender` will be dropped and |
125 // If the parent thread panics, `rev_info_sender` will be dropped and |
77 // exit its own loop. |
127 // exit its own loop. |
78 let thread = std::thread::spawn(move || { |
128 let thread = std::thread::spawn(move || { |
79 let mut combine_changeset_copies = |
129 let mut combine_changeset_copies = |
80 CombineChangesetCopies::new(children_count); |
130 CombineChangesetCopies::new(children_count); |
81 for (rev, p1, p2, opt_bytes) in rev_info_receiver { |
131 for (rev, p1, p2, opt_bytes) in rev_info_receiver { |
82 let gil = Python::acquire_gil(); |
|
83 let py = gil.python(); |
|
84 let files = match &opt_bytes { |
132 let files = match &opt_bytes { |
85 Some(raw) => ChangedFiles::new(raw.data(py)), |
133 Some(raw) => ChangedFiles::new(raw.data()), |
86 // Python None was extracted to Option::None, |
134 // Python None was extracted to Option::None, |
87 // meaning there was no copy data. |
135 // meaning there was no copy data. |
88 None => ChangedFiles::new_empty(), |
136 None => ChangedFiles::new_empty(), |
89 }; |
137 }; |
90 combine_changeset_copies.add_revision(rev, p1, p2, files) |
138 combine_changeset_copies.add_revision(rev, p1, p2, files) |
|
139 |
|
140 // The GIL is (still) implicitly acquired here through |
|
141 // `impl Drop for PyBytes`. |
91 } |
142 } |
92 |
143 |
93 combine_changeset_copies.finish(target_rev) |
144 combine_changeset_copies.finish(target_rev) |
94 }); |
145 }); |
95 |
146 |
96 for rev_info in revs_info { |
147 for rev_info in revs_info { |
97 let (rev, p1, p2, opt_bytes) = rev_info?; |
148 let (rev, p1, p2, opt_bytes) = rev_info?; |
|
149 let opt_bytes = opt_bytes.map(|b| PyBytesWithData::new(py, b)); |
98 |
150 |
99 // We’d prefer to avoid the child thread calling into Python code, |
151 // We’d prefer to avoid the child thread calling into Python code, |
100 // but this avoids a potential deadlock on the GIL if it does: |
152 // but this avoids a potential deadlock on the GIL if it does: |
101 py.allow_threads(|| { |
153 py.allow_threads(|| { |
102 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( |
154 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( |