From 00f2995d8813ccd80c0a425d062fef46e6bc8b03 Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Wed, 29 Apr 2026 15:18:21 +0300 Subject: [PATCH 1/2] Use thread index instead of condvar for query waiters --- Cargo.lock | 1 + compiler/rustc_interface/src/interface.rs | 8 +--- compiler/rustc_interface/src/passes.rs | 1 - compiler/rustc_interface/src/util.rs | 22 +++++------ compiler/rustc_middle/src/query/job.rs | 44 ++++++++-------------- compiler/rustc_middle/src/ty/context.rs | 6 --- compiler/rustc_query_impl/src/execution.rs | 2 +- compiler/rustc_query_impl/src/job.rs | 7 +--- compiler/rustc_thread_pool/Cargo.toml | 1 + compiler/rustc_thread_pool/src/lib.rs | 2 +- compiler/rustc_thread_pool/src/registry.rs | 20 +++++++--- 11 files changed, 47 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53cff99b4a199..4396caed11cf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4689,6 +4689,7 @@ dependencies = [ "crossbeam-deque", "crossbeam-utils", "libc", + "parking_lot", "rand 0.9.2", "rand_xorshift", "scoped-tls", diff --git a/compiler/rustc_interface/src/interface.rs b/compiler/rustc_interface/src/interface.rs index ab8bc1c7f1b34..d49afc8fd34df 100644 --- a/compiler/rustc_interface/src/interface.rs +++ b/compiler/rustc_interface/src/interface.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use rustc_ast::{LitKind, MetaItemKind, token}; use rustc_codegen_ssa::traits::CodegenBackend; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; -use rustc_data_structures::jobserver::{self, Proxy}; +use rustc_data_structures::jobserver; use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed}; use rustc_lint::LintStore; use rustc_middle::ty; @@ -41,9 +41,6 @@ pub struct Compiler { /// A reference to the current `GlobalCtxt` which we pass on to `GlobalCtxt`. pub(crate) current_gcx: CurrentGcx, - - /// A jobserver reference which we pass on to `GlobalCtxt`. - pub(crate) jobserver_proxy: Arc, } /// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`. @@ -410,7 +407,7 @@ pub fn run_compiler(config: Config, f: impl FnOnce(&Compiler) -> R + Se config.opts.unstable_opts.threads, &config.extra_symbols, SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind }, - |current_gcx, jobserver_proxy| { + |current_gcx| { // The previous `early_dcx` can't be reused here because it doesn't // impl `Send`. Creating a new one is fine. let early_dcx = EarlyDiagCtxt::new(config.opts.error_format); @@ -482,7 +479,6 @@ pub fn run_compiler(config: Config, f: impl FnOnce(&Compiler) -> R + Se codegen_backend, override_queries: config.override_queries, current_gcx, - jobserver_proxy, }; // There are two paths out of `f`. diff --git a/compiler/rustc_interface/src/passes.rs b/compiler/rustc_interface/src/passes.rs index 38d899853cd5b..c245e591e6431 100644 --- a/compiler/rustc_interface/src/passes.rs +++ b/compiler/rustc_interface/src/passes.rs @@ -1001,7 +1001,6 @@ pub fn create_and_enter_global_ctxt FnOnce(TyCtxt<'tcx>) -> T>( ), providers.hooks, compiler.current_gcx.clone(), - Arc::clone(&compiler.jobserver_proxy), |tcx| { let feed = tcx.create_crate_num(stable_crate_id).unwrap(); assert_eq!(feed.key(), LOCAL_CRATE); diff --git a/compiler/rustc_interface/src/util.rs b/compiler/rustc_interface/src/util.rs index 24b23cc4199e9..535547569c724 100644 --- a/compiler/rustc_interface/src/util.rs +++ b/compiler/rustc_interface/src/util.rs @@ -130,7 +130,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize { }) } -fn run_in_thread_with_globals) -> R + Send, R: Send>( +fn run_in_thread_with_globals R + Send, R: Send>( thread_stack_size: usize, edition: Edition, sm_inputs: SourceMapInputs, @@ -156,7 +156,7 @@ fn run_in_thread_with_globals) -> R + Send, R: edition, extra_symbols, Some(sm_inputs), - || f(CurrentGcx::new(), Proxy::new()), + || f(CurrentGcx::new()), ) }) .unwrap() @@ -169,10 +169,7 @@ fn run_in_thread_with_globals) -> R + Send, R: }) } -pub(crate) fn run_in_thread_pool_with_globals< - F: FnOnce(CurrentGcx, Arc) -> R + Send, - R: Send, ->( +pub(crate) fn run_in_thread_pool_with_globals R + Send, R: Send>( thread_builder_diag: &EarlyDiagCtxt, edition: Edition, threads: usize, @@ -196,11 +193,11 @@ pub(crate) fn run_in_thread_pool_with_globals< edition, sm_inputs, extra_symbols, - |current_gcx, jobserver_proxy| { + |current_gcx| { // Register the thread for use with the `WorkerLocal` type. registry.register(); - f(current_gcx, jobserver_proxy) + f(current_gcx) }, ); }; @@ -209,13 +206,12 @@ pub(crate) fn run_in_thread_pool_with_globals< let current_gcx2 = current_gcx.clone(); let proxy = Proxy::new(); - let proxy_ = Arc::clone(&proxy); - let proxy__ = Arc::clone(&proxy); + let builder = rustc_thread_pool::ThreadPoolBuilder::new() .thread_name(|_| "rustc".to_string()) - .acquire_thread_handler(move || proxy_.acquire_thread()) - .release_thread_handler(move || proxy__.release_thread()) + .acquire_thread_handler(move || proxy.acquire_thread()) + .release_thread_handler(move || proxy_.release_thread()) .num_threads(threads) .deadlock_handler(move || { // On deadlock, creates a new thread and forwards information in thread @@ -291,7 +287,7 @@ internal compiler error: query cycle handler thread panicked, aborting process"; }, // Run `f` on the first thread in the thread pool. move |pool: &rustc_thread_pool::ThreadPool| { - pool.install(|| f(current_gcx.into_inner(), proxy)) + pool.install(|| f(current_gcx.into_inner())) }, ) .unwrap_or_else(|err| { diff --git a/compiler/rustc_middle/src/query/job.rs b/compiler/rustc_middle/src/query/job.rs index 8c78bf24287e0..33e04134837b5 100644 --- a/compiler/rustc_middle/src/query/job.rs +++ b/compiler/rustc_middle/src/query/job.rs @@ -3,11 +3,10 @@ use std::hash::Hash; use std::num::NonZero; use std::sync::Arc; -use parking_lot::{Condvar, Mutex}; +use parking_lot::Mutex; use rustc_span::Span; use crate::query::Cycle; -use crate::ty::TyCtxt; /// A value uniquely identifying an active query job. #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] @@ -54,15 +53,16 @@ impl<'tcx> QueryJob<'tcx> { #[derive(Debug)] pub struct QueryWaiter<'tcx> { pub parent: Option, - pub condvar: Condvar, + // FIXME: could be made u16 due to rustc_thread_pool limiting number of threads + pub thread_index: usize, pub span: Span, - pub cycle: Mutex>>, + pub cycle: Arc>>>, } #[derive(Clone, Debug)] pub struct QueryLatch<'tcx> { /// The `Option` is `Some(..)` when the job is active, and `None` once completed. - pub waiters: Arc>>>>>, + pub waiters: Arc>>>>, } impl<'tcx> QueryLatch<'tcx> { @@ -71,46 +71,33 @@ impl<'tcx> QueryLatch<'tcx> { } /// Awaits for the query job to complete. - pub fn wait_on( - &self, - tcx: TyCtxt<'tcx>, - query: Option, - span: Span, - ) -> Result<(), Cycle<'tcx>> { + pub fn wait_on(&self, query: Option, span: Span) -> Result<(), Cycle<'tcx>> { + let thread_index = rustc_thread_pool::current_thread_index().unwrap(); let mut waiters_guard = self.waiters.lock(); let Some(waiters) = &mut *waiters_guard else { return Ok(()); // already complete }; - let waiter = Arc::new(QueryWaiter { - parent: query, - span, - cycle: Mutex::new(None), - condvar: Condvar::new(), - }); + let cycle = Arc::new(Mutex::new(None)); + let waiter = QueryWaiter { parent: query, span, cycle: Arc::clone(&cycle), thread_index }; // We push the waiter on to the `waiters` list. It can be accessed inside // the `wait` call below, by 1) the `set` method or 2) by deadlock detection. // Both of these will remove it from the `waiters` list before resuming // this thread. - waiters.push(Arc::clone(&waiter)); + waiters.push(waiter); // Awaits the caller on this latch by blocking the current thread. // If this detects a deadlock and the deadlock handler wants to resume this thread // we have to be in the `wait` call. This is ensured by the deadlock handler // getting the self.info lock. - rustc_thread_pool::mark_blocked(); - tcx.jobserver_proxy.release_thread(); - waiter.condvar.wait(&mut waiters_guard); - // Release the lock before we potentially block in `acquire_thread` - drop(waiters_guard); - tcx.jobserver_proxy.acquire_thread(); + rustc_thread_pool::park(waiters_guard); // FIXME: Get rid of this lock. We have ownership of the QueryWaiter // although another thread may still have a Arc reference so we cannot // use Arc::get_mut - let mut cycle = waiter.cycle.lock(); - match cycle.take() { + let mut cycle_lock = cycle.lock(); + match cycle_lock.take() { None => Ok(()), Some(cycle) => Err(cycle), } @@ -122,14 +109,13 @@ impl<'tcx> QueryLatch<'tcx> { let waiters = waiters_guard.take().unwrap(); // mark the latch as complete let registry = rustc_thread_pool::Registry::current(); for waiter in waiters { - rustc_thread_pool::mark_unblocked(®istry); - waiter.condvar.notify_one(); + rustc_thread_pool::unpark(®istry, waiter.thread_index); } } /// Removes a single waiter from the list of waiters. /// This is used to break query cycles. - pub fn extract_waiter(&self, waiter: usize) -> Arc> { + pub fn extract_waiter(&self, waiter: usize) -> QueryWaiter<'tcx> { let mut waiters_guard = self.waiters.lock(); let waiters = waiters_guard.as_mut().expect("non-empty waiters vec"); // Remove the waiter from the list of waiters diff --git a/compiler/rustc_middle/src/ty/context.rs b/compiler/rustc_middle/src/ty/context.rs index e26eb13243762..c8d51771f4a74 100644 --- a/compiler/rustc_middle/src/ty/context.rs +++ b/compiler/rustc_middle/src/ty/context.rs @@ -20,7 +20,6 @@ use rustc_ast as ast; use rustc_data_structures::defer; use rustc_data_structures::fx::FxHashMap; use rustc_data_structures::intern::Interned; -use rustc_data_structures::jobserver::Proxy; use rustc_data_structures::profiling::SelfProfilerRef; use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap}; use rustc_data_structures::stable_hasher::HashStable; @@ -792,9 +791,6 @@ pub struct GlobalCtxt<'tcx> { pub(crate) alloc_map: interpret::AllocMap<'tcx>, current_gcx: CurrentGcx, - - /// A jobserver reference used to release then acquire a token while waiting on a query. - pub jobserver_proxy: Arc, } impl<'tcx> GlobalCtxt<'tcx> { @@ -981,7 +977,6 @@ impl<'tcx> TyCtxt<'tcx> { query_system: QuerySystem<'tcx>, hooks: crate::hooks::Providers, current_gcx: CurrentGcx, - jobserver_proxy: Arc, f: impl FnOnce(TyCtxt<'tcx>) -> T, ) -> T { let data_layout = s.target.parse_data_layout().unwrap_or_else(|err| { @@ -1019,7 +1014,6 @@ impl<'tcx> TyCtxt<'tcx> { data_layout, alloc_map: interpret::AllocMap::new(), current_gcx, - jobserver_proxy, }); // This is a separate function to work around a crash with parallel rustc (#135870) diff --git a/compiler/rustc_query_impl/src/execution.rs b/compiler/rustc_query_impl/src/execution.rs index ed9ad8c7a0a68..6d2c6aaed9ab6 100644 --- a/compiler/rustc_query_impl/src/execution.rs +++ b/compiler/rustc_query_impl/src/execution.rs @@ -247,7 +247,7 @@ fn wait_for_query<'tcx, C: QueryCache>( let query_blocked_prof_timer = tcx.prof.query_blocked(); // With parallel queries we might just have to wait on some other thread. - let result = latch.wait_on(tcx, current, span); + let result = latch.wait_on(current, span); match result { Ok(()) => { diff --git a/compiler/rustc_query_impl/src/job.rs b/compiler/rustc_query_impl/src/job.rs index bf0493b29fd1e..a47b3fc62440c 100644 --- a/compiler/rustc_query_impl/src/job.rs +++ b/compiler/rustc_query_impl/src/job.rs @@ -1,6 +1,5 @@ use std::io::Write; use std::ops::ControlFlow; -use std::sync::Arc; use std::{iter, mem}; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; @@ -308,7 +307,7 @@ fn process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId fn find_and_process_cycle<'tcx>( job_map: &QueryJobMap<'tcx>, query: QueryJobId, -) -> Option>> { +) -> Option> { let mut visited = FxHashSet::default(); let mut stack = Vec::new(); if let ControlFlow::Break(resumable) = @@ -350,9 +349,7 @@ pub fn break_query_cycle<'tcx>(job_map: QueryJobMap<'tcx>, registry: &rustc_thre .expect("unable to find a query cycle"); // Mark the thread we're about to wake up as unblocked. - rustc_thread_pool::mark_unblocked(registry); - - assert!(waiter.condvar.notify_one(), "unable to wake the waiter"); + assert!(rustc_thread_pool::unpark(registry, waiter.thread_index), "unable to wake the waiter"); } pub fn print_query_stack<'tcx>( diff --git a/compiler/rustc_thread_pool/Cargo.toml b/compiler/rustc_thread_pool/Cargo.toml index c92984470b7ae..dd3d4acfb0978 100644 --- a/compiler/rustc_thread_pool/Cargo.toml +++ b/compiler/rustc_thread_pool/Cargo.toml @@ -15,6 +15,7 @@ categories = ["concurrency"] [dependencies] crossbeam-deque = "0.8" crossbeam-utils = "0.8" +parking_lot = "0.12" smallvec = "1.8.1" [dev-dependencies] diff --git a/compiler/rustc_thread_pool/src/lib.rs b/compiler/rustc_thread_pool/src/lib.rs index 7ce7fbc27eabe..3ac3f37a866b6 100644 --- a/compiler/rustc_thread_pool/src/lib.rs +++ b/compiler/rustc_thread_pool/src/lib.rs @@ -95,7 +95,7 @@ pub use worker_local::WorkerLocal; pub use self::broadcast::{BroadcastContext, broadcast, spawn_broadcast}; pub use self::join::{join, join_context}; use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; -pub use self::registry::{Registry, ThreadBuilder, mark_blocked, mark_unblocked}; +pub use self::registry::{Registry, ThreadBuilder, park, unpark}; pub use self::scope::{Scope, ScopeFifo, in_place_scope, in_place_scope_fifo, scope, scope_fifo}; pub use self::spawn::{spawn, spawn_fifo}; pub use self::thread_pool::{ diff --git a/compiler/rustc_thread_pool/src/registry.rs b/compiler/rustc_thread_pool/src/registry.rs index 9510c1842f86a..dc5be471914f3 100644 --- a/compiler/rustc_thread_pool/src/registry.rs +++ b/compiler/rustc_thread_pool/src/registry.rs @@ -617,19 +617,26 @@ impl Registry { /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler /// if no other worker thread is active #[inline] -pub fn mark_blocked() { +pub fn park(mut mutex_guard: parking_lot::MutexGuard<'_, T>) { let worker_thread = WorkerThread::current(); assert!(!worker_thread.is_null()); unsafe { - let registry = &(*worker_thread).registry; - registry.sleep.mark_blocked(®istry.deadlock_handler) + let worker_thread = &*worker_thread; + let registry = &worker_thread.registry; + registry.sleep.mark_blocked(®istry.deadlock_handler); + registry.release_thread(); + registry.thread_infos[worker_thread.index].condvar.wait(&mut mutex_guard); + // Release the lock before we potentially block in `acquire_thread` + drop(mutex_guard); + registry.acquire_thread(); } } /// Mark a previously blocked Rayon worker thread as unblocked #[inline] -pub fn mark_unblocked(registry: &Registry) { - registry.sleep.mark_unblocked() +pub fn unpark(registry: &Registry, thread_index: usize) -> bool { + registry.sleep.mark_unblocked(); + registry.thread_infos[thread_index].condvar.notify_one() } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -655,6 +662,8 @@ struct ThreadInfo { /// the "stealer" half of the worker's deque stealer: Stealer, + + condvar: parking_lot::Condvar, } impl ThreadInfo { @@ -663,6 +672,7 @@ impl ThreadInfo { primed: LockLatch::new(), stopped: LockLatch::new(), terminate: OnceLatch::new(), + condvar: parking_lot::Condvar::new(), stealer, } } From 0d8d68d7a84c4eaaf530dfffe5345455330c8cae Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Thu, 30 Apr 2026 13:50:04 +0300 Subject: [PATCH 2/2] Use Arc::get_mut --- compiler/rustc_middle/src/query/job.rs | 16 +++++++++------- compiler/rustc_query_impl/src/job.rs | 22 ++++++++++------------ 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/compiler/rustc_middle/src/query/job.rs b/compiler/rustc_middle/src/query/job.rs index 33e04134837b5..14540d1c88fda 100644 --- a/compiler/rustc_middle/src/query/job.rs +++ b/compiler/rustc_middle/src/query/job.rs @@ -78,7 +78,7 @@ impl<'tcx> QueryLatch<'tcx> { return Ok(()); // already complete }; - let cycle = Arc::new(Mutex::new(None)); + let mut cycle = Arc::new(Mutex::new(None)); let waiter = QueryWaiter { parent: query, span, cycle: Arc::clone(&cycle), thread_index }; // We push the waiter on to the `waiters` list. It can be accessed inside @@ -93,11 +93,9 @@ impl<'tcx> QueryLatch<'tcx> { // getting the self.info lock. rustc_thread_pool::park(waiters_guard); - // FIXME: Get rid of this lock. We have ownership of the QueryWaiter - // although another thread may still have a Arc reference so we cannot - // use Arc::get_mut - let mut cycle_lock = cycle.lock(); - match cycle_lock.take() { + // We make sure to drop waiter before unparking a worker thread + let cycle = Arc::get_mut(&mut cycle).unwrap().get_mut(); + match cycle.take() { None => Ok(()), Some(cycle) => Err(cycle), } @@ -109,7 +107,11 @@ impl<'tcx> QueryLatch<'tcx> { let waiters = waiters_guard.take().unwrap(); // mark the latch as complete let registry = rustc_thread_pool::Registry::current(); for waiter in waiters { - rustc_thread_pool::unpark(®istry, waiter.thread_index); + // Return waiter thread's index to resume and drop `waiter` for resumed thread + // to use `Arc::get_mut` on its cycle arc pointer. + let waiter_thread = waiter.thread_index; + drop(waiter); + rustc_thread_pool::unpark(®istry, waiter_thread); } } diff --git a/compiler/rustc_query_impl/src/job.rs b/compiler/rustc_query_impl/src/job.rs index a47b3fc62440c..cc8e16aece54d 100644 --- a/compiler/rustc_query_impl/src/job.rs +++ b/compiler/rustc_query_impl/src/job.rs @@ -6,7 +6,7 @@ use rustc_data_structures::fx::{FxHashMap, FxHashSet}; use rustc_errors::{Diag, DiagCtxtHandle}; use rustc_hir::def::DefKind; use rustc_middle::queries::TaggedQueryKey; -use rustc_middle::query::{Cycle, QueryJob, QueryJobId, QueryLatch, QueryStackFrame, QueryWaiter}; +use rustc_middle::query::{Cycle, QueryJob, QueryJobId, QueryLatch, QueryStackFrame}; use rustc_middle::ty::TyCtxt; use rustc_span::{DUMMY_SP, Span}; @@ -303,11 +303,8 @@ fn process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId } /// Looks for a query cycle starting at `query`. -/// Returns a waiter to resume if a cycle is found. -fn find_and_process_cycle<'tcx>( - job_map: &QueryJobMap<'tcx>, - query: QueryJobId, -) -> Option> { +/// Returns a waiter thread's index to resume if a cycle is found. +fn find_and_process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, query: QueryJobId) -> Option { let mut visited = FxHashSet::default(); let mut stack = Vec::new(); if let ControlFlow::Break(resumable) = @@ -326,8 +323,9 @@ fn find_and_process_cycle<'tcx>( // Set the cycle error so it will be picked up when resumed *waiter.cycle.lock() = Some(error); - // Put the waiter on the list of things to resume - Some(waiter) + // Return waiter thread's index to resume and drop `QueryWaiter::cycle` for resumed thread + // to use `Arc::get_mut`. + Some(waiter.thread_index) } else { None } @@ -341,15 +339,15 @@ fn find_and_process_cycle<'tcx>( /// there will be multiple rounds through the deadlock handler if multiple cycles are present. #[allow(rustc::potential_query_instability)] pub fn break_query_cycle<'tcx>(job_map: QueryJobMap<'tcx>, registry: &rustc_thread_pool::Registry) { - // Look for a cycle starting at each query job - let waiter = job_map + // Look for a cycle starting at each query job, + let waiter_thread = job_map .map .keys() .find_map(|query| find_and_process_cycle(&job_map, *query)) .expect("unable to find a query cycle"); - // Mark the thread we're about to wake up as unblocked. - assert!(rustc_thread_pool::unpark(registry, waiter.thread_index), "unable to wake the waiter"); + // Unpark one waiter thread. + assert!(rustc_thread_pool::unpark(registry, waiter_thread), "unable to wake the waiter"); } pub fn print_query_stack<'tcx>(