Skip to content

Commit

Permalink
Support resetting of contention metric, fix deadlocking (#3)
Browse files Browse the repository at this point in the history
- Support method reset_contention_metric() which resets the contention metric and timers.
- Fix deadlocking by internally spawning a thread specifically to try and obtain the GIL. This allows the monitoring thread to remain available for receiving messages.
Thereby also removing the flaky decorator and xfail markers.

Ref dask/distributed#7290 (comment)
  • Loading branch information
milesgranger authored Jan 18, 2023
1 parent 8e107d3 commit aed7ec4
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "gil-knocker"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
authors = ["Miles Granger <[email protected]>"]
license = "MIT"
Expand Down
89 changes: 72 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[deny(missing_docs)]
use parking_lot::{const_rwlock, RwLock};
use pyo3::exceptions::PyRuntimeError;
use pyo3::ffi::{PyEval_InitThreads, PyEval_ThreadsInitialized};
use pyo3::prelude::*;
use pyo3::{
exceptions::{PyBrokenPipeError, PyTimeoutError, PyValueError},
Expand All @@ -23,6 +24,12 @@ fn gilknocker(_py: Python, m: &PyModule) -> PyResult<()> {
Ok(())
}

/// Possible messages to pass to the monitoring thread.
enum Message {
Stop,
Reset,
}

/// Struct for polling, knocking on the GIL,
/// checking if it's locked in the current thread
///
Expand All @@ -40,8 +47,8 @@ fn gilknocker(_py: Python, m: &PyModule) -> PyResult<()> {
#[derive(Default)]
pub struct KnockKnock {
handle: Option<thread::JoinHandle<()>>,
channel: Option<Sender<bool>>,
contention_metric: Option<Arc<RwLock<f32>>>,
channel: Option<Sender<Message>>,
contention_metric: Arc<RwLock<f32>>,
interval: Duration,
timeout: Duration,
}
Expand Down Expand Up @@ -72,32 +79,80 @@ impl KnockKnock {
/// and lower indicates less contention, with 0 theoretically indicating zero
/// contention.
#[getter]
pub fn contention_metric(&self) -> Option<f32> {
self.contention_metric.as_ref().map(|v| *(*v).read())
pub fn contention_metric(&self) -> f32 {
*(*self.contention_metric).read()
}

/// Reset the contention metric/monitoring state
pub fn reset_contention_metric(&mut self) -> PyResult<()> {
match &self.channel {
Some(channel) => {
channel
.send(Message::Reset)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;

// need to wait for thread to catch and process reset
while *(*self.contention_metric).read() > 0.005 {
thread::sleep(Duration::from_millis(1));
}
Ok(())
}
None => Err(PyValueError::new_err(
"Does not appear `start` was called, nothing to reset.",
)),
}
}

/// Start polling the GIL to check if it's locked.
pub fn start(&mut self, py: Python) -> () {
let (send, recv) = channel();
self.channel = Some(send);

unsafe {
if PyEval_ThreadsInitialized() == 0 {
PyEval_InitThreads();
}
}

let contention_metric = Arc::new(const_rwlock(0_f32));
self.contention_metric = Some(contention_metric.clone());
self.contention_metric = contention_metric.clone();

let interval = self.interval;
let handle = py.allow_threads(move || {
thread::spawn(move || {
let mut time_to_acquire = Duration::from_millis(0);
let runtime = Instant::now();
while recv
.recv_timeout(interval)
.unwrap_or_else(|e| e != RecvTimeoutError::Disconnected)
{
let start = Instant::now();
time_to_acquire += Python::with_gil(move |_py| start.elapsed());
{
let mut cm = (*contention_metric).write();
*cm = time_to_acquire.as_micros() as f32
/ runtime.elapsed().as_micros() as f32;
let mut runtime = Instant::now();
let mut handle: Option<thread::JoinHandle<Duration>> = None;
loop {
match recv.recv_timeout(interval) {
Ok(message) => match message {
Message::Stop => break,
Message::Reset => {
time_to_acquire = Duration::from_millis(0);
runtime = Instant::now();
*(*contention_metric).write() = 0_f32;
}
},
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => match handle {
Some(hdl) => {
if hdl.is_finished() {
time_to_acquire += hdl.join().unwrap();
let mut cm = (*contention_metric).write();
*cm = time_to_acquire.as_micros() as f32
/ runtime.elapsed().as_micros() as f32;
handle = None;
} else {
handle = Some(hdl);
}
}
None => {
handle = Some(thread::spawn(move || {
let start = Instant::now();
Python::with_gil(move |_py| start.elapsed())
}));
}
},
}
}
})
Expand All @@ -110,7 +165,7 @@ impl KnockKnock {
match take(&mut self.handle) {
Some(handle) => {
if let Some(send) = take(&mut self.channel) {
send.send(false)
send.send(Message::Stop)
.map_err(|e| PyBrokenPipeError::new_err(e.to_string()))?;

let start = Instant::now();
Expand Down
37 changes: 15 additions & 22 deletions tests/test_knockknock.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,6 @@
N_PTS = 4096


def flaky(n_tries=10):
def wrapper(func):
def _wrapper(*args, **kwargs):
for _ in range(n_tries - 1):
try:
return func(*args, **kwargs)
except:
pass
return func(*args, **kwargs)

return _wrapper

return wrapper


def a_lotta_gil():
"""Keep the GIL busy"""
for i in range(100_000_000):
Expand Down Expand Up @@ -52,13 +37,11 @@ def _run(target):
return knocker


@pytest.mark.xfail(raises=TimeoutError)
@flaky()
def test_knockknock_busy():
knocker = _run(a_lotta_gil)

try:
# usually ~0.9 on linux ~0.6 on windows
# usually ~0.9, but sometimes ~0.6 on Mac
assert knocker.contention_metric > 0.6

# Now wait for it to 'cool' back down
Expand All @@ -70,23 +53,33 @@ def test_knockknock_busy():
prev_cm = knocker.contention_metric

# ~0.15 oN mY MaChInE.
assert knocker.contention_metric < 0.2
assert knocker.contention_metric < 0.3
finally:
knocker.stop()


@pytest.mark.xfail(raises=TimeoutError)
@flaky()
def test_knockknock_available_gil():
knocker = _run(a_little_gil)

try:
# usually ~0.001 on linux and ~0.05 on windows
# usually ~0.002
assert knocker.contention_metric < 0.06
finally:
knocker.stop()


def test_knockknock_reset_contention_metric():
knocker = _run(a_lotta_gil)

try:
assert knocker.contention_metric > 0.6
knocker.reset_contention_metric()
assert knocker.contention_metric < 0.001

finally:
knocker.stop()


# Manual verification with py-spy
# busy should give high GIL %
if __name__ == "__main__":
Expand Down

0 comments on commit aed7ec4

Please sign in to comment.