copies-rust: send PyBytes values back be dropped ino the parent thread
… instead of acquiring the GIL in the Rust thread in the Drop impl
This commit is based on the premise that crossbeam-channel
with unbounded send and non-blocking receive is faster than
a contended GIL, but that remains to be measured.
Differential Revision: https://phab.mercurial-scm.org/D9686
--- a/rust/hg-cpython/src/copy_tracing.rs Thu Nov 26 18:23:51 2020 +0100
+++ b/rust/hg-cpython/src/copy_tracing.rs Tue Jan 05 21:46:21 2021 +0100
@@ -1,6 +1,7 @@
use cpython::ObjectProtocol;
use cpython::PyBytes;
use cpython::PyDict;
+use cpython::PyDrop;
use cpython::PyList;
use cpython::PyModule;
use cpython::PyObject;
@@ -58,6 +59,10 @@
// alive, and the returned slice borrows `self`.
unsafe { &*self.data }
}
+
+ pub fn unwrap(self) -> PyBytes {
+ self.keep_alive
+ }
}
}
@@ -93,7 +98,8 @@
Ok((rev, p1, p2, opt_bytes))
});
- let path_copies = if !multi_thread {
+ let path_copies;
+ if !multi_thread {
let mut combine_changeset_copies =
CombineChangesetCopies::new(children_count);
@@ -108,7 +114,7 @@
combine_changeset_copies.add_revision(rev, p1, p2, files)
}
- combine_changeset_copies.finish(target_rev)
+ path_copies = combine_changeset_copies.finish(target_rev)
} else {
// Use a bounded channel to provide back-pressure:
// if the child thread is slower to process revisions than this thread
@@ -119,6 +125,13 @@
let (rev_info_sender, rev_info_receiver) =
crossbeam_channel::bounded::<RevInfo<PyBytesWithData>>(1000);
+ // This channel (going the other way around) however is unbounded.
+ // If they were both bounded, there might potentially be deadlocks
+ // where both channels are full and both threads are waiting on each
+ // other.
+ let (pybytes_sender, pybytes_receiver) =
+ crossbeam_channel::unbounded();
+
// Start a thread that does CPU-heavy processing in parallel with the
// loop below.
//
@@ -135,10 +148,20 @@
// meaning there was no copy data.
None => ChangedFiles::new_empty(),
};
- combine_changeset_copies.add_revision(rev, p1, p2, files)
+ combine_changeset_copies.add_revision(rev, p1, p2, files);
- // The GIL is (still) implicitly acquired here through
- // `impl Drop for PyBytes`.
+ // Send `PyBytes` back to the parent thread so the parent
+ // thread can drop it. Otherwise the GIL would be implicitly
+ // acquired here through `impl Drop for PyBytes`.
+ if let Some(bytes) = opt_bytes {
+ if let Err(_) = pybytes_sender.send(bytes.unwrap()) {
+ // The channel is disconnected, meaning the parent
+ // thread panicked or returned
+ // early through
+ // `?` to propagate a Python exception.
+ break;
+ }
+ }
}
combine_changeset_copies.finish(target_rev)
@@ -155,10 +178,15 @@
"combine_changeset_copies: channel is disconnected",
);
});
+
+ // Drop anything in the channel, without blocking
+ for pybytes in pybytes_receiver.try_iter() {
+ pybytes.release_ref(py)
+ }
}
// We’d prefer to avoid the child thread calling into Python code,
// but this avoids a potential deadlock on the GIL if it does:
- py.allow_threads(|| {
+ path_copies = py.allow_threads(|| {
// Disconnect the channel to signal the child thread to stop:
// the `for … in rev_info_receiver` loop will end.
drop(rev_info_sender);
@@ -167,7 +195,12 @@
thread.join().unwrap_or_else(|panic_payload| {
std::panic::resume_unwind(panic_payload)
})
- })
+ });
+
+ // Drop anything left in the channel
+ for pybytes in pybytes_receiver.iter() {
+ pybytes.release_ref(py)
+ }
};
let out = PyDict::new(py);