-
Notifications
You must be signed in to change notification settings - Fork 6
fix(runner): dont run health check in hot path #201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c010d98
9f81229
e2d87f9
9035594
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,17 +1,19 @@ | ||
| use crate::prelude::*; | ||
| use anyhow::Context; | ||
| use futures::StreamExt; | ||
| use nix::{sys::time::TimeValLike, time::clock_gettime}; | ||
| use runner_shared::artifacts::ExecutionTimestamps; | ||
| use runner_shared::fifo::{Command as FifoCommand, MarkerType}; | ||
| use runner_shared::fifo::{RUNNER_ACK_FIFO, RUNNER_CTL_FIFO}; | ||
| use std::cmp::Ordering; | ||
| use std::path::{Path, PathBuf}; | ||
| use std::{collections::HashSet, time::Duration}; | ||
| use tokio::io::{AsyncReadExt, AsyncWriteExt}; | ||
| use tokio::io::AsyncWriteExt; | ||
| use tokio::net::unix::pid_t; | ||
| use tokio::net::unix::pipe::OpenOptions as TokioPipeOpenOptions; | ||
| use tokio::net::unix::pipe::Receiver as TokioPipeReader; | ||
| use tokio::net::unix::pipe::Sender as TokioPipeSender; | ||
| use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; | ||
|
|
||
| fn create_fifo<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<()> { | ||
| // Remove the previous FIFO (if it exists) | ||
|
|
@@ -71,7 +73,7 @@ pub struct FifoBenchmarkData { | |
|
|
||
| pub struct RunnerFifo { | ||
| ack_fifo: TokioPipeSender, | ||
| ctl_fifo: TokioPipeReader, | ||
| ctl_reader: FramedRead<TokioPipeReader, LengthDelimitedCodec>, | ||
| } | ||
|
|
||
| fn get_pipe_open_options() -> TokioPipeOpenOptions { | ||
|
|
@@ -84,30 +86,37 @@ fn get_pipe_open_options() -> TokioPipeOpenOptions { | |
|
|
||
| impl RunnerFifo { | ||
| pub fn new() -> anyhow::Result<Self> { | ||
| create_fifo(RUNNER_CTL_FIFO)?; | ||
| create_fifo(RUNNER_ACK_FIFO)?; | ||
| Self::open(RUNNER_CTL_FIFO.as_ref(), RUNNER_ACK_FIFO.as_ref()) | ||
| } | ||
|
|
||
| pub fn open(ctl_path: &Path, ack_path: &Path) -> anyhow::Result<Self> { | ||
| create_fifo(ctl_path)?; | ||
| create_fifo(ack_path)?; | ||
|
|
||
| let ack_fifo = get_pipe_open_options().open_sender(ack_path)?; | ||
| let ctl_fifo = get_pipe_open_options().open_receiver(ctl_path)?; | ||
|
|
||
| let ack_fifo = get_pipe_open_options().open_sender(RUNNER_ACK_FIFO)?; | ||
| let ctl_fifo = get_pipe_open_options().open_receiver(RUNNER_CTL_FIFO)?; | ||
| let codec = LengthDelimitedCodec::builder() | ||
| .length_field_length(4) | ||
| .little_endian() | ||
| .new_codec(); | ||
| let ctl_reader = FramedRead::new(ctl_fifo, codec); | ||
|
|
||
| Ok(Self { ctl_fifo, ack_fifo }) | ||
| Ok(Self { | ||
| ack_fifo, | ||
| ctl_reader, | ||
| }) | ||
| } | ||
|
|
||
| pub async fn recv_cmd(&mut self) -> anyhow::Result<FifoCommand> { | ||
| let mut len_buffer = [0u8; 4]; | ||
| self.ctl_fifo.read_exact(&mut len_buffer).await?; | ||
| let message_len = u32::from_le_bytes(len_buffer) as usize; | ||
|
|
||
| let mut buffer = vec![0u8; message_len]; | ||
| loop { | ||
| if self.ctl_fifo.read_exact(&mut buffer).await.is_ok() { | ||
| break; | ||
| } | ||
| } | ||
| let bytes = self | ||
| .ctl_reader | ||
| .next() | ||
| .await | ||
| .ok_or_else(|| anyhow::anyhow!("FIFO stream closed"))??; | ||
|
|
||
| let decoded = bincode::deserialize(&buffer).with_context(|| { | ||
| format!("Failed to deserialize FIFO command (len: {message_len}, data: {buffer:?})") | ||
| })?; | ||
| let decoded = bincode::deserialize(&bytes) | ||
| .with_context(|| format!("Failed to deserialize FIFO command (data: {bytes:?})"))?; | ||
| Ok(decoded) | ||
| } | ||
|
|
||
|
|
@@ -145,90 +154,96 @@ impl RunnerFifo { | |
| }; | ||
|
|
||
| let mut benchmark_started = false; | ||
|
|
||
| // Outer loop: continues until health check fails | ||
| loop { | ||
| let is_alive = health_check().await?; | ||
| if !is_alive { | ||
| break; | ||
| } | ||
| // Inner loop: process commands until timeout/error | ||
| loop { | ||
| let result = tokio::time::timeout(Duration::from_secs(1), self.recv_cmd()).await; | ||
| let cmd = match result { | ||
| Ok(Ok(cmd)) => cmd, | ||
| Ok(Err(e)) => { | ||
| warn!("Failed to parse FIFO command: {e}"); | ||
| break; | ||
| } | ||
| Err(_) => break, // Timeout | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: is the error necessarily a timeout ? Shouldnt we check that it is indeed a timeout and panic if something else occurs ? |
||
| }; | ||
| trace!("Received command: {cmd:?}"); | ||
|
|
||
| let result = tokio::time::timeout(Duration::from_secs(1), self.recv_cmd()).await; | ||
| let cmd = match result { | ||
| Ok(Ok(cmd)) => cmd, | ||
| Ok(Err(e)) => { | ||
| warn!("Failed to parse FIFO command: {e}"); | ||
| break; | ||
| // Try executor-specific handler first | ||
| if let Some(response) = handle_cmd(&cmd).await? { | ||
| self.send_cmd(response).await?; | ||
| continue; | ||
| } | ||
| Err(_) => continue, | ||
| }; | ||
| trace!("Received command: {cmd:?}"); | ||
|
|
||
| // Try executor-specific handler first | ||
| if let Some(response) = handle_cmd(&cmd).await? { | ||
| self.send_cmd(response).await?; | ||
| continue; | ||
| } | ||
|
|
||
| // Fall through to shared implementation for standard commands | ||
| match &cmd { | ||
| FifoCommand::CurrentBenchmark { pid, uri } => { | ||
| bench_order_by_timestamp.push((current_time(), uri.to_string())); | ||
| bench_pids.insert(*pid); | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| FifoCommand::StartBenchmark => { | ||
| if !benchmark_started { | ||
| benchmark_started = true; | ||
| markers.push(MarkerType::SampleStart(current_time())); | ||
| } else { | ||
| warn!("Received duplicate StartBenchmark command, ignoring"); | ||
| // Fall through to shared implementation for standard commands | ||
| match &cmd { | ||
| FifoCommand::CurrentBenchmark { pid, uri } => { | ||
| bench_order_by_timestamp.push((current_time(), uri.to_string())); | ||
| bench_pids.insert(*pid); | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| FifoCommand::StopBenchmark => { | ||
| if benchmark_started { | ||
| benchmark_started = false; | ||
| markers.push(MarkerType::SampleEnd(current_time())); | ||
| } else { | ||
| warn!("Received StopBenchmark command before StartBenchmark, ignoring"); | ||
| FifoCommand::StartBenchmark => { | ||
| if !benchmark_started { | ||
| benchmark_started = true; | ||
| markers.push(MarkerType::SampleStart(current_time())); | ||
| } else { | ||
| warn!("Received duplicate StartBenchmark command, ignoring"); | ||
| } | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| FifoCommand::SetIntegration { name, version } => { | ||
| integration = Some((name.into(), version.into())); | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| FifoCommand::AddMarker { marker, .. } => { | ||
| markers.push(*marker); | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| FifoCommand::SetVersion(protocol_version) => { | ||
| match protocol_version.cmp(&runner_shared::fifo::CURRENT_PROTOCOL_VERSION) { | ||
| Ordering::Less => { | ||
| if *protocol_version | ||
| < runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION | ||
| { | ||
| bail!( | ||
| "Integration is using a version of the protocol that is smaller than the minimal supported protocol version ({protocol_version} < {}). \ | ||
| Please update the integration to a supported version.", | ||
| runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION | ||
| ); | ||
| } | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| FifoCommand::StopBenchmark => { | ||
| if benchmark_started { | ||
| benchmark_started = false; | ||
| markers.push(MarkerType::SampleEnd(current_time())); | ||
| } else { | ||
| warn!("Received StopBenchmark command before StartBenchmark, ignoring"); | ||
| } | ||
| Ordering::Greater => bail!( | ||
| "Runner is using an incompatible protocol version ({} < {protocol_version}). Please update the runner to the latest version.", | ||
| runner_shared::fifo::CURRENT_PROTOCOL_VERSION | ||
| ), | ||
| Ordering::Equal => { | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| FifoCommand::SetIntegration { name, version } => { | ||
| integration = Some((name.into(), version.into())); | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| FifoCommand::AddMarker { marker, .. } => { | ||
| markers.push(*marker); | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| FifoCommand::SetVersion(protocol_version) => { | ||
| match protocol_version.cmp(&runner_shared::fifo::CURRENT_PROTOCOL_VERSION) { | ||
| Ordering::Less => { | ||
| if *protocol_version | ||
| < runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION | ||
| { | ||
| bail!( | ||
| "Integration is using a version of the protocol that is smaller than the minimal supported protocol version ({protocol_version} < {}). \ | ||
| Please update the integration to a supported version.", | ||
| runner_shared::fifo::MINIMAL_SUPPORTED_PROTOCOL_VERSION | ||
| ); | ||
| } | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| Ordering::Greater => bail!( | ||
| "Runner is using an incompatible protocol version ({} < {protocol_version}). Please update the runner to the latest version.", | ||
| runner_shared::fifo::CURRENT_PROTOCOL_VERSION | ||
| ), | ||
| Ordering::Equal => { | ||
| self.send_cmd(FifoCommand::Ack).await?; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| _ => { | ||
| warn!("Unhandled FIFO command: {cmd:?}"); | ||
| self.send_cmd(FifoCommand::Err).await?; | ||
| _ => { | ||
| warn!("Unhandled FIFO command: {cmd:?}"); | ||
| self.send_cmd(FifoCommand::Err).await?; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let is_alive = health_check().await?; | ||
| if !is_alive { | ||
| info!("Process terminated, stopping the command handler"); | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| let marker_result = ExecutionTimestamps::new(&bench_order_by_timestamp, &markers); | ||
|
|
@@ -240,3 +255,41 @@ impl RunnerFifo { | |
| Ok((marker_result, fifo_data)) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use std::time::Duration; | ||
| use tokio::io::AsyncWriteExt; | ||
|
|
||
| #[tokio::test] | ||
| async fn recv_cmd_is_not_cancel_safe() { | ||
| let temp_dir = tempfile::tempdir().unwrap(); | ||
| let ctl_path = temp_dir.path().join("ctl_fifo"); | ||
| let ack_path = temp_dir.path().join("ack_fifo"); | ||
|
|
||
| let mut fifo = RunnerFifo::open(&ctl_path, &ack_path).unwrap(); | ||
| let mut writer = get_pipe_open_options().open_sender(&ctl_path).unwrap(); | ||
|
|
||
| let cmd = FifoCommand::Ack; | ||
| let payload = bincode::serialize(&cmd).unwrap(); | ||
| let len_bytes = (payload.len() as u32).to_le_bytes(); | ||
|
|
||
| tokio::spawn(async move { | ||
| writer.write_all(&len_bytes).await.unwrap(); | ||
| writer.write_all(&payload[..1]).await.unwrap(); | ||
| tokio::time::sleep(Duration::from_millis(50)).await; | ||
| writer.write_all(&payload[1..]).await.unwrap(); | ||
| }); | ||
|
|
||
| let first = tokio::time::timeout(Duration::from_millis(10), fifo.recv_cmd()).await; | ||
| assert!(first.is_err(), "Expected timeout on first recv_cmd"); | ||
|
|
||
| let second = tokio::time::timeout(Duration::from_millis(200), fifo.recv_cmd()).await; | ||
|
|
||
| assert!( | ||
| matches!(second, Ok(Ok(FifoCommand::Ack))), | ||
| "recv_cmd should be cancel-safe: expected Ok(Ok(Ack)), got: {second:?}" | ||
| ); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,10 +31,7 @@ use runner_shared::metadata::PerfMetadata; | |
| use runner_shared::unwind_data::UnwindData; | ||
| use std::path::Path; | ||
| use std::path::PathBuf; | ||
| use std::sync::Arc; | ||
| use std::time::Duration; | ||
| use std::{cell::OnceCell, collections::HashMap, process::ExitStatus}; | ||
| use tokio::sync::Mutex; | ||
|
|
||
| mod jit_dump; | ||
| mod memory_mappings; | ||
|
|
@@ -205,10 +202,12 @@ impl PerfRunner { | |
| let cmd = wrap_with_sudo(wrapped_builder)?.build(); | ||
| debug!("cmd: {cmd:?}"); | ||
|
|
||
| let on_process_started = async |_| -> anyhow::Result<()> { | ||
| let on_process_started = async |pid| -> anyhow::Result<()> { | ||
| // If we output pipedata, we do not parse the perf map during teardown yet, so we need to parse memory | ||
| // maps as we receive the `CurrentBenchmark` fifo commands. | ||
| let data = Self::handle_fifo(runner_fifo, perf_fifo, self.output_pipedata).await?; | ||
| let data = | ||
| Self::handle_fifo(runner_fifo, perf_fifo, self.output_pipedata, pid as pid_t) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't we type pid in the closure parameter typing directly ? |
||
| .await?; | ||
| self.benchmark_data.set(data).unwrap_or_else(|_| { | ||
| error!("Failed to set benchmark data in PerfRunner"); | ||
| }); | ||
|
|
@@ -248,37 +247,44 @@ impl PerfRunner { | |
|
|
||
| async fn handle_fifo( | ||
| mut runner_fifo: RunnerFifo, | ||
| perf_fifo: PerfFifo, | ||
| mut perf_fifo: PerfFifo, | ||
| parse_memory_maps: bool, | ||
| pid: pid_t, | ||
| ) -> anyhow::Result<BenchmarkData> { | ||
| let mut symbols_by_pid = HashMap::<pid_t, ProcessSymbols>::new(); | ||
| let mut unwind_data_by_pid = HashMap::<pid_t, Vec<UnwindData>>::new(); | ||
|
|
||
| let perf_fifo = Arc::new(Mutex::new(perf_fifo)); | ||
| let mut perf_ping_timeout = 5; | ||
| // The runner spawns a `bash` process, which will execute perf and the benchmark. To check if it | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'd rather we have an explicit thread to handle this, instead of hacking something that relies on the executor having a root bash process. We don't necessarily need to switch to a full multi-threaded async runtime, we could explicitely spawn a thread and handle this from there. Let's create a dedicated task for this IMO. |
||
| // terminated, we have to check if all the sub-processes terminated. | ||
| // We can't check for the `bash` process, because it will terminate after the FIFO handler has | ||
| // finished (because we're using a single-threaded runtime). | ||
| let mut sys = sysinfo::System::new(); | ||
| let health_check = async || { | ||
| let perf_ping = tokio::time::timeout(Duration::from_secs(perf_ping_timeout), async { | ||
| perf_fifo.lock().await.ping().await | ||
| }) | ||
| .await; | ||
| if let Ok(Err(_)) | Err(_) = perf_ping { | ||
| debug!("Failed to ping perf FIFO, ending perf fifo loop"); | ||
| return Ok(false); | ||
| } | ||
| // Perf has started successfully, we can decrease the timeout for future pings | ||
| perf_ping_timeout = 1; | ||
| sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true); | ||
| let process = sys.process(sysinfo::Pid::from_u32(pid as u32)); | ||
|
|
||
| match process { | ||
| None => Ok(false), | ||
| Some(_proc) => { | ||
| let has_children = sys.processes().values().any(|p| { | ||
| p.parent() | ||
| .map(|parent_pid| parent_pid.as_u32() == pid as u32) | ||
| .unwrap_or(false) | ||
| }); | ||
|
|
||
| Ok(true) | ||
| Ok(has_children) | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| let on_cmd = async |cmd: &FifoCommand| { | ||
| #[allow(deprecated)] | ||
| match cmd { | ||
| FifoCommand::StartBenchmark => { | ||
| perf_fifo.lock().await.start_events().await?; | ||
| perf_fifo.start_events().await?; | ||
| } | ||
| FifoCommand::StopBenchmark => { | ||
| perf_fifo.lock().await.stop_events().await?; | ||
| perf_fifo.stop_events().await?; | ||
| } | ||
| FifoCommand::CurrentBenchmark { pid, .. } => { | ||
| #[cfg(target_os = "linux")] | ||
|
|
@@ -294,7 +300,7 @@ impl PerfRunner { | |
| } | ||
| } | ||
| FifoCommand::PingPerf => { | ||
| if perf_fifo.lock().await.ping().await.is_err() { | ||
| if perf_fifo.ping().await.is_err() { | ||
| return Ok(Some(FifoCommand::Err)); | ||
| } | ||
| return Ok(Some(FifoCommand::Ack)); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldnt this just be until timeout ?
Do we really want to continue the loops if we somehow fail to parse a command ?
(Note: your comment is in line with the implementation, I'm just ocnsidering why we wouldnt stop completely in case of error)