Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4689,6 +4689,7 @@ dependencies = [
"crossbeam-deque",
"crossbeam-utils",
"libc",
"parking_lot",
"rand 0.9.2",
"rand_xorshift",
"scoped-tls",
Expand Down
8 changes: 2 additions & 6 deletions compiler/rustc_interface/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use rustc_ast::{LitKind, MetaItemKind, token};
use rustc_codegen_ssa::traits::CodegenBackend;
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
use rustc_data_structures::jobserver::{self, Proxy};
use rustc_data_structures::jobserver;
use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed};
use rustc_lint::LintStore;
use rustc_middle::ty;
Expand Down Expand Up @@ -41,9 +41,6 @@ pub struct Compiler {

/// A reference to the current `GlobalCtxt` which we pass on to `GlobalCtxt`.
pub(crate) current_gcx: CurrentGcx,

/// A jobserver reference which we pass on to `GlobalCtxt`.
pub(crate) jobserver_proxy: Arc<Proxy>,
}

/// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`.
Expand Down Expand Up @@ -410,7 +407,7 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
config.opts.unstable_opts.threads,
&config.extra_symbols,
SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind },
|current_gcx, jobserver_proxy| {
|current_gcx| {
// The previous `early_dcx` can't be reused here because it doesn't
// impl `Send`. Creating a new one is fine.
let early_dcx = EarlyDiagCtxt::new(config.opts.error_format);
Expand Down Expand Up @@ -482,7 +479,6 @@ pub fn run_compiler<R: Send>(config: Config, f: impl FnOnce(&Compiler) -> R + Se
codegen_backend,
override_queries: config.override_queries,
current_gcx,
jobserver_proxy,
};

// There are two paths out of `f`.
Expand Down
1 change: 0 additions & 1 deletion compiler/rustc_interface/src/passes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,6 @@ pub fn create_and_enter_global_ctxt<T, F: for<'tcx> FnOnce(TyCtxt<'tcx>) -> T>(
),
providers.hooks,
compiler.current_gcx.clone(),
Arc::clone(&compiler.jobserver_proxy),
|tcx| {
let feed = tcx.create_crate_num(stable_crate_id).unwrap();
assert_eq!(feed.key(), LOCAL_CRATE);
Expand Down
22 changes: 9 additions & 13 deletions compiler/rustc_interface/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize {
})
}

fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R: Send>(
fn run_in_thread_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
thread_stack_size: usize,
edition: Edition,
sm_inputs: SourceMapInputs,
Expand All @@ -156,7 +156,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R:
edition,
extra_symbols,
Some(sm_inputs),
|| f(CurrentGcx::new(), Proxy::new()),
|| f(CurrentGcx::new()),
)
})
.unwrap()
Expand All @@ -169,10 +169,7 @@ fn run_in_thread_with_globals<F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send, R:
})
}

pub(crate) fn run_in_thread_pool_with_globals<
F: FnOnce(CurrentGcx, Arc<Proxy>) -> R + Send,
R: Send,
>(
pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send, R: Send>(
thread_builder_diag: &EarlyDiagCtxt,
edition: Edition,
threads: usize,
Expand All @@ -196,11 +193,11 @@ pub(crate) fn run_in_thread_pool_with_globals<
edition,
sm_inputs,
extra_symbols,
|current_gcx, jobserver_proxy| {
|current_gcx| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();

f(current_gcx, jobserver_proxy)
f(current_gcx)
},
);
};
Expand All @@ -209,13 +206,12 @@ pub(crate) fn run_in_thread_pool_with_globals<
let current_gcx2 = current_gcx.clone();

let proxy = Proxy::new();

let proxy_ = Arc::clone(&proxy);
let proxy__ = Arc::clone(&proxy);

let builder = rustc_thread_pool::ThreadPoolBuilder::new()
.thread_name(|_| "rustc".to_string())
.acquire_thread_handler(move || proxy_.acquire_thread())
.release_thread_handler(move || proxy__.release_thread())
.acquire_thread_handler(move || proxy.acquire_thread())
.release_thread_handler(move || proxy_.release_thread())
.num_threads(threads)
.deadlock_handler(move || {
// On deadlock, creates a new thread and forwards information in thread
Expand Down Expand Up @@ -291,7 +287,7 @@ internal compiler error: query cycle handler thread panicked, aborting process";
},
// Run `f` on the first thread in the thread pool.
move |pool: &rustc_thread_pool::ThreadPool| {
pool.install(|| f(current_gcx.into_inner(), proxy))
pool.install(|| f(current_gcx.into_inner()))
},
)
.unwrap_or_else(|err| {
Expand Down
52 changes: 20 additions & 32 deletions compiler/rustc_middle/src/query/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use std::hash::Hash;
use std::num::NonZero;
use std::sync::Arc;

use parking_lot::{Condvar, Mutex};
use parking_lot::Mutex;
use rustc_span::Span;

use crate::query::Cycle;
use crate::ty::TyCtxt;

/// A value uniquely identifying an active query job.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
Expand Down Expand Up @@ -54,15 +53,16 @@ impl<'tcx> QueryJob<'tcx> {
#[derive(Debug)]
pub struct QueryWaiter<'tcx> {
pub parent: Option<QueryJobId>,
pub condvar: Condvar,
// FIXME: could be made u16 due to rustc_thread_pool limiting number of threads
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think shrinking this field will change the struct size because the other fields are all 64-bits (on 64-bit platforms). I suggest just removing the comment.

pub thread_index: usize,
pub span: Span,
pub cycle: Mutex<Option<Cycle<'tcx>>>,
pub cycle: Arc<Mutex<Option<Cycle<'tcx>>>>,
}

#[derive(Clone, Debug)]
pub struct QueryLatch<'tcx> {
/// The `Option` is `Some(..)` when the job is active, and `None` once completed.
pub waiters: Arc<Mutex<Option<Vec<Arc<QueryWaiter<'tcx>>>>>>,
pub waiters: Arc<Mutex<Option<Vec<QueryWaiter<'tcx>>>>>,
}

impl<'tcx> QueryLatch<'tcx> {
Expand All @@ -71,45 +71,30 @@ impl<'tcx> QueryLatch<'tcx> {
}

/// Awaits for the query job to complete.
pub fn wait_on(
&self,
tcx: TyCtxt<'tcx>,
query: Option<QueryJobId>,
span: Span,
) -> Result<(), Cycle<'tcx>> {
pub fn wait_on(&self, query: Option<QueryJobId>, span: Span) -> Result<(), Cycle<'tcx>> {
let thread_index = rustc_thread_pool::current_thread_index().unwrap();
let mut waiters_guard = self.waiters.lock();
let Some(waiters) = &mut *waiters_guard else {
return Ok(()); // already complete
};

let waiter = Arc::new(QueryWaiter {
parent: query,
span,
cycle: Mutex::new(None),
condvar: Condvar::new(),
});
let mut cycle = Arc::new(Mutex::new(None));
let waiter = QueryWaiter { parent: query, span, cycle: Arc::clone(&cycle), thread_index };

// We push the waiter on to the `waiters` list. It can be accessed inside
// the `wait` call below, by 1) the `set` method or 2) by deadlock detection.
// Both of these will remove it from the `waiters` list before resuming
// this thread.
waiters.push(Arc::clone(&waiter));
waiters.push(waiter);

// Awaits the caller on this latch by blocking the current thread.
// If this detects a deadlock and the deadlock handler wants to resume this thread
// we have to be in the `wait` call. This is ensured by the deadlock handler
// getting the self.info lock.
rustc_thread_pool::mark_blocked();
tcx.jobserver_proxy.release_thread();
waiter.condvar.wait(&mut waiters_guard);
// Release the lock before we potentially block in `acquire_thread`
drop(waiters_guard);
tcx.jobserver_proxy.acquire_thread();

// FIXME: Get rid of this lock. We have ownership of the QueryWaiter
// although another thread may still have a Arc reference so we cannot
// use Arc::get_mut
let mut cycle = waiter.cycle.lock();
rustc_thread_pool::park(waiters_guard);

// We make sure to drop waiter before unparking a worker thread
let cycle = Arc::get_mut(&mut cycle).unwrap().get_mut();
match cycle.take() {
None => Ok(()),
Some(cycle) => Err(cycle),
Expand All @@ -122,14 +107,17 @@ impl<'tcx> QueryLatch<'tcx> {
let waiters = waiters_guard.take().unwrap(); // mark the latch as complete
let registry = rustc_thread_pool::Registry::current();
for waiter in waiters {
rustc_thread_pool::mark_unblocked(&registry);
waiter.condvar.notify_one();
// Return waiter thread's index to resume and drop `waiter` for resumed thread
// to use `Arc::get_mut` on its cycle arc pointer.
let waiter_thread = waiter.thread_index;
drop(waiter);
rustc_thread_pool::unpark(&registry, waiter_thread);
}
}

/// Removes a single waiter from the list of waiters.
/// This is used to break query cycles.
pub fn extract_waiter(&self, waiter: usize) -> Arc<QueryWaiter<'tcx>> {
pub fn extract_waiter(&self, waiter: usize) -> QueryWaiter<'tcx> {
let mut waiters_guard = self.waiters.lock();
let waiters = waiters_guard.as_mut().expect("non-empty waiters vec");
// Remove the waiter from the list of waiters
Expand Down
6 changes: 0 additions & 6 deletions compiler/rustc_middle/src/ty/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use rustc_ast as ast;
use rustc_data_structures::defer;
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::intern::Interned;
use rustc_data_structures::jobserver::Proxy;
use rustc_data_structures::profiling::SelfProfilerRef;
use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap};
use rustc_data_structures::stable_hasher::HashStable;
Expand Down Expand Up @@ -792,9 +791,6 @@ pub struct GlobalCtxt<'tcx> {
pub(crate) alloc_map: interpret::AllocMap<'tcx>,

current_gcx: CurrentGcx,

/// A jobserver reference used to release then acquire a token while waiting on a query.
pub jobserver_proxy: Arc<Proxy>,
}

impl<'tcx> GlobalCtxt<'tcx> {
Expand Down Expand Up @@ -981,7 +977,6 @@ impl<'tcx> TyCtxt<'tcx> {
query_system: QuerySystem<'tcx>,
hooks: crate::hooks::Providers,
current_gcx: CurrentGcx,
jobserver_proxy: Arc<Proxy>,
f: impl FnOnce(TyCtxt<'tcx>) -> T,
) -> T {
let data_layout = s.target.parse_data_layout().unwrap_or_else(|err| {
Expand Down Expand Up @@ -1019,7 +1014,6 @@ impl<'tcx> TyCtxt<'tcx> {
data_layout,
alloc_map: interpret::AllocMap::new(),
current_gcx,
jobserver_proxy,
});

// This is a separate function to work around a crash with parallel rustc (#135870)
Expand Down
2 changes: 1 addition & 1 deletion compiler/rustc_query_impl/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ fn wait_for_query<'tcx, C: QueryCache>(
let query_blocked_prof_timer = tcx.prof.query_blocked();

// With parallel queries we might just have to wait on some other thread.
let result = latch.wait_on(tcx, current, span);
let result = latch.wait_on(current, span);

match result {
Ok(()) => {
Expand Down
25 changes: 10 additions & 15 deletions compiler/rustc_query_impl/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::io::Write;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::{iter, mem};

use rustc_data_structures::fx::{FxHashMap, FxHashSet};
use rustc_errors::{Diag, DiagCtxtHandle};
use rustc_hir::def::DefKind;
use rustc_middle::queries::TaggedQueryKey;
use rustc_middle::query::{Cycle, QueryJob, QueryJobId, QueryLatch, QueryStackFrame, QueryWaiter};
use rustc_middle::query::{Cycle, QueryJob, QueryJobId, QueryLatch, QueryStackFrame};
use rustc_middle::ty::TyCtxt;
use rustc_span::{DUMMY_SP, Span};

Expand Down Expand Up @@ -304,11 +303,8 @@ fn process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId
}

/// Looks for a query cycle starting at `query`.
/// Returns a waiter to resume if a cycle is found.
fn find_and_process_cycle<'tcx>(
job_map: &QueryJobMap<'tcx>,
query: QueryJobId,
) -> Option<Arc<QueryWaiter<'tcx>>> {
/// Returns a waiter thread's index to resume if a cycle is found.
fn find_and_process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, query: QueryJobId) -> Option<usize> {
let mut visited = FxHashSet::default();
let mut stack = Vec::new();
if let ControlFlow::Break(resumable) =
Expand All @@ -327,8 +323,9 @@ fn find_and_process_cycle<'tcx>(
// Set the cycle error so it will be picked up when resumed
*waiter.cycle.lock() = Some(error);

// Put the waiter on the list of things to resume
Some(waiter)
// Return waiter thread's index to resume and drop `QueryWaiter::cycle` for resumed thread
// to use `Arc::get_mut`.
Some(waiter.thread_index)
} else {
None
}
Expand All @@ -342,17 +339,15 @@ fn find_and_process_cycle<'tcx>(
/// there will be multiple rounds through the deadlock handler if multiple cycles are present.
#[allow(rustc::potential_query_instability)]
pub fn break_query_cycle<'tcx>(job_map: QueryJobMap<'tcx>, registry: &rustc_thread_pool::Registry) {
// Look for a cycle starting at each query job
let waiter = job_map
// Look for a cycle starting at each query job,
let waiter_thread = job_map
.map
.keys()
.find_map(|query| find_and_process_cycle(&job_map, *query))
.expect("unable to find a query cycle");

// Mark the thread we're about to wake up as unblocked.
rustc_thread_pool::mark_unblocked(registry);

assert!(waiter.condvar.notify_one(), "unable to wake the waiter");
// Unpark one waiter thread.
assert!(rustc_thread_pool::unpark(registry, waiter_thread), "unable to wake the waiter");
}

pub fn print_query_stack<'tcx>(
Expand Down
1 change: 1 addition & 0 deletions compiler/rustc_thread_pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ categories = ["concurrency"]
[dependencies]
crossbeam-deque = "0.8"
crossbeam-utils = "0.8"
parking_lot = "0.12"
smallvec = "1.8.1"

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion compiler/rustc_thread_pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub use worker_local::WorkerLocal;
pub use self::broadcast::{BroadcastContext, broadcast, spawn_broadcast};
pub use self::join::{join, join_context};
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
pub use self::registry::{Registry, ThreadBuilder, mark_blocked, mark_unblocked};
pub use self::registry::{Registry, ThreadBuilder, park, unpark};
pub use self::scope::{Scope, ScopeFifo, in_place_scope, in_place_scope_fifo, scope, scope_fifo};
pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::{
Expand Down
Loading
Loading