diff --git a/pingora-load-balancing/src/lib.rs b/pingora-load-balancing/src/lib.rs index 0e1bc6e5..65cbdc85 100644 --- a/pingora-load-balancing/src/lib.rs +++ b/pingora-load-balancing/src/lib.rs @@ -444,6 +444,41 @@ where } } +impl LoadBalancer { + /// Select a backend and return an RAII lease that tracks the inflight request. + pub fn select_with_lease( + &self, + key: &[u8], + max_iterations: usize, + ) -> Option<(Backend, selection::LeastConnLease)> { + self.select_with_lease_with(key, max_iterations, |_, health| health) + } + + /// Like [`Self::select_with_lease`] but with a custom `accept` predicate. + /// + /// See [`Self::select_with`] for `accept` semantics. + pub fn select_with_lease_with( + &self, + key: &[u8], + max_iterations: usize, + accept: F, + ) -> Option<(Backend, selection::LeastConnLease)> + where + F: Fn(&Backend, bool) -> bool, + { + let selection = self.selector.load(); + let mut iter = UniqueIterator::new(selection.iter(key), max_iterations); + while let Some(b) = iter.get_next() { + if accept(&b, self.backends.ready(&b)) { + if let Some(lease) = selection.acquire_lease(&b) { + return Some((b, lease)); + } + } + } + None + } +} + #[cfg(test)] mod test { use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; diff --git a/pingora-load-balancing/src/selection/least.rs b/pingora-load-balancing/src/selection/least.rs new file mode 100644 index 00000000..bdd2634d --- /dev/null +++ b/pingora-load-balancing/src/selection/least.rs @@ -0,0 +1,316 @@ +// Copyright 2026 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Least connections backend selection. + +use super::{Backend, BackendIter, BackendSelection}; +use std::collections::{BTreeSet, HashMap}; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering::Relaxed}; +use std::sync::{Arc, Mutex, Weak}; + +/// Shared store for per backend inflight counters. +#[derive(Debug, Default)] +pub struct LeastConnStateStore { + // Mutex is locked only during rebuild!! + handles: Mutex>>, +} + +impl LeastConnStateStore { + fn get_or_create(&self, hash_key: u64) -> Arc { + let mut map = self.handles.lock().expect("state store lock poisoned"); + if let Some(existing) = map.get(&hash_key).and_then(Weak::upgrade) { + return existing; + } + let handle = Arc::new(AtomicU32::new(0)); + map.insert(hash_key, Arc::downgrade(&handle)); + handle + } + + fn cleanup_stale(&self) { + let mut map = self.handles.lock().expect("state store lock poisoned"); + map.retain(|_, w| w.strong_count() > 0); + } +} + +/// Configuration for [`LeastConnections`] selection. +#[derive(Clone, Debug, Default)] +pub struct LeastConnectionsConfig { + state_store: Arc, +} + +impl LeastConnectionsConfig { + pub fn new() -> Self { + Self::default() + } + + pub(crate) fn state_store(&self) -> &LeastConnStateStore { + &self.state_store + } +} + +/// Least connections backend selectr. +pub struct LeastConnections { + backends: Box<[Backend]>, + handles: Box<[Arc]>, + index_by_hash: HashMap, + /// Monotonic counter for round robin tie-breaking among equally loaded backends. + tie_breaker: AtomicUsize, +} + +impl LeastConnections { + /// Increment the inflight counter and return a lease. + pub(crate) fn acquire_lease(&self, backend: &Backend) -> Option { + let &idx = self.index_by_hash.get(&backend.hash_key())?; + let handle = self.handles[idx].clone(); + handle.fetch_add(1, Relaxed); + Some(LeastConnLease { handle }) + } + + #[allow(dead_code)] + pub(crate) fn active_connections(&self, backend: &Backend) -> Option { + let &idx = self.index_by_hash.get(&backend.hash_key())?; + Some(self.handles[idx].load(Relaxed)) + } + + #[allow(dead_code)] + pub(crate) fn handle_for_backend(&self, backend: &Backend) -> Option> { + let &idx = self.index_by_hash.get(&backend.hash_key())?; + Some(self.handles[idx].clone()) + } +} + +impl BackendSelection for LeastConnections { + type Iter = LeastConnectionsIterator; + type Config = LeastConnectionsConfig; + + fn build(backends: &BTreeSet) -> Self { + Self::build_with_config(backends, &LeastConnectionsConfig::default()) + } + + fn build_with_config(backends: &BTreeSet, config: &Self::Config) -> Self { + let store = config.state_store(); + let mut backend = Vec::with_capacity(backends.len()); + let mut handles = Vec::with_capacity(backends.len()); + let mut index_by_hash = HashMap::with_capacity(backends.len()); + + for (idx, b) in backends.iter().enumerate() { + index_by_hash.insert(b.hash_key(), idx); + handles.push(store.get_or_create(b.hash_key())); + backend.push(b.clone()); + } + + store.cleanup_stale(); + + Self { + backends: backend.into_boxed_slice(), + handles: handles.into_boxed_slice(), + index_by_hash, + tie_breaker: AtomicUsize::new(0), + } + } + + fn iter(self: &Arc, _key: &[u8]) -> Self::Iter { + LeastConnectionsIterator::new(self.clone()) + } +} + +/// Iterator that yields backends in ascending weighted load order. +/// +/// ## algorithm +/// +/// - Snapshot: read all inflight counters. +/// - Sort: order backend indices by normalised load +/// (`a_active * b_weight` cmp with `b_active * a_weight`). +/// - Tie-break: when normalised loads are equal, a rotating offset picks +/// which backend comes first. The offset increments on every call, +/// giving round robin fairness among equally loaded backends. Weight is not +/// involved in tie-breaking - it is already factored into the load comparison. +pub struct LeastConnectionsIterator { + selector: Arc, + sorted: Box<[usize]>, + cursor: usize, +} + +impl LeastConnectionsIterator { + fn new(selector: Arc) -> Self { + let len = selector.backends.len(); + let mut indices: Vec = (0..len).collect(); + + if len > 1 { + let inflight_counters: Vec = + selector.handles.iter().map(|h| h.load(Relaxed)).collect(); + + let rotation = selector.tie_breaker.fetch_add(1, Relaxed) % len; + + indices.sort_by(|&left, &right| { + let l_weight = selector.backends[left].weight.max(1) as u64; + let r_weight = selector.backends[right].weight.max(1) as u64; + let l_load = inflight_counters[left] as u64 * r_weight; + let r_load = inflight_counters[right] as u64 * l_weight; + + l_load.cmp(&r_load).then_with(|| { + let l_rank = (left + len - rotation) % len; + let r_rank = (right + len - rotation) % len; + l_rank.cmp(&r_rank) + }) + }); + } + + Self { + selector, + sorted: indices.into_boxed_slice(), + cursor: 0, + } + } +} + +impl BackendIter for LeastConnectionsIterator { + fn next(&mut self) -> Option<&Backend> { + let &idx = self.sorted.get(self.cursor)?; + self.cursor += 1; + self.selector.backends.get(idx) + } +} + +/// RAII lease for least connections selection. +#[derive(Debug)] +pub struct LeastConnLease { + handle: Arc, +} + +impl LeastConnLease { + pub fn inflight(&self) -> u32 { + self.handle.load(Relaxed) + } +} + +impl Drop for LeastConnLease { + fn drop(&mut self) { + self.handle.fetch_sub(1, Relaxed); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn backend(addr: &str, weight: usize) -> Backend { + Backend::new_with_weight(addr, weight).unwrap() + } + + #[test] + fn test_prefers_backend_with_smallest_load() { + let b1 = backend("1.1.1.1:80", 1); + let b2 = backend("1.0.0.1:80", 1); + let set = BTreeSet::from_iter([b1.clone(), b2.clone()]); + + let selector = Arc::new(LeastConnections::build(&set)); + let _lease = selector.acquire_lease(&b1).unwrap(); + + let mut iter = selector.iter(b""); + assert_eq!(iter.next(), Some(&b2)); + assert_eq!(iter.next(), Some(&b1)); + } + + #[test] + fn test_weighted_comparison() { + // to avoid dropping leases + let mut leases = vec![]; + let b0 = backend("1.1.1.0:80", 1); + let b1 = backend("1.1.1.1:80", 2); + let b2 = backend("1.1.1.2:80", 1); + let b3 = backend("1.1.1.3:80", 1); + let set = BTreeSet::from_iter([b0.clone(), b1.clone(), b2.clone(), b3.clone()]); + + let selector = Arc::new(LeastConnections::build(&set)); + + // setup loads: [5,3,7,2] + for _ in 0..5 { + leases.push(selector.acquire_lease(&b0).unwrap()); + } + for _ in 0..3 { + leases.push(selector.acquire_lease(&b1).unwrap()); + } + for _ in 0..7 { + leases.push(selector.acquire_lease(&b2).unwrap()); + } + for _ in 0..2 { + leases.push(selector.acquire_lease(&b3).unwrap()); + } + + // normalized load is active/weight (compared without division as: + // left_active * right_weight vs right_active * left_weight) + // + // with loads [5,3,7,2] and weights [1,2,1,1]: + // b1=3/2=1.5 < b3=2/1=2 < b0=5/1=5 < b2=7/1=7 + + let mut iter = selector.iter(b""); + + assert_eq!(iter.next(), Some(&b1)); + assert_eq!(iter.next(), Some(&b3)); + assert_eq!(iter.next(), Some(&b0)); + assert_eq!(iter.next(), Some(&b2)); + } + + #[test] + fn test_fair_tie_break() { + let b1 = backend("1.1.1.1:80", 1); + let b2 = backend("1.0.0.1:80", 1); + let set = BTreeSet::from_iter([b1.clone(), b2.clone()]); + let selector = Arc::new(LeastConnections::build(&set)); + + let mut first_is_b1 = 0usize; + let mut first_is_b2 = 0usize; + for _ in 0..8 { + match selector.iter(b"").next() { + Some(chosen) if *chosen == b1 => first_is_b1 += 1, + Some(chosen) if *chosen == b2 => first_is_b2 += 1, + _ => unreachable!(), + } + } + + assert_eq!(first_is_b1, 4); + assert_eq!(first_is_b2, 4); + } + + #[test] + fn test_lease_drop_decrements_counter() { + let b1 = backend("1.1.1.1:80", 1); + let set = BTreeSet::from_iter([b1.clone()]); + let selector = Arc::new(LeastConnections::build(&set)); + + assert_eq!(selector.active_connections(&b1), Some(0)); + let lease = selector.acquire_lease(&b1).unwrap(); + assert_eq!(selector.active_connections(&b1), Some(1)); + drop(lease); + assert_eq!(selector.active_connections(&b1), Some(0)); + } + + #[test] + fn test_state_store_preserves_handles_across_rebuilds() { + let b1 = backend("1.1.1.1:80", 1); + let config = LeastConnectionsConfig::default(); + + let set_v1 = BTreeSet::from_iter([b1.clone()]); + let selector_v1 = LeastConnections::build_with_config(&set_v1, &config); + let handle_v1 = selector_v1.handle_for_backend(&b1).unwrap(); + + let b1_fresh = backend("1.1.1.1:80", 1); + let set_v2 = BTreeSet::from_iter([b1_fresh.clone()]); + let selector_v2 = LeastConnections::build_with_config(&set_v2, &config); + let handle_v2 = selector_v2.handle_for_backend(&b1_fresh).unwrap(); + + assert!(Arc::ptr_eq(&handle_v1, &handle_v2)); + } +} diff --git a/pingora-load-balancing/src/selection/mod.rs b/pingora-load-balancing/src/selection/mod.rs index 3e9d85ed..a41c54ec 100644 --- a/pingora-load-balancing/src/selection/mod.rs +++ b/pingora-load-balancing/src/selection/mod.rs @@ -16,6 +16,7 @@ pub mod algorithms; pub mod consistent; +pub mod least; pub mod weighted; use super::Backend; @@ -82,7 +83,12 @@ pub type RoundRobin = Weighted; /// Consistent Ketama hashing on weighted backends pub type Consistent = consistent::KetamaHashing; -// TODO: least conn +/// Least connections selection on weighted backends +pub type LeastConnections = least::LeastConnections; +/// Configuration for [`LeastConnections`] selection +pub type LeastConnectionsConfig = least::LeastConnectionsConfig; +/// RAII lease for least connections selection +pub type LeastConnLease = least::LeastConnLease; /// An iterator which wraps another iterator and yields unique items. It optionally takes a max /// number of iterations if the wrapped iterator never returns. diff --git a/pingora-proxy/examples/least_connections.rs b/pingora-proxy/examples/least_connections.rs new file mode 100644 index 00000000..d3d208e8 --- /dev/null +++ b/pingora-proxy/examples/least_connections.rs @@ -0,0 +1,108 @@ +// Copyright 2026 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use log::info; +use pingora_core::server::configuration::Opt; +use pingora_core::server::Server; +use pingora_core::services::background::background_service; +use pingora_core::upstreams::peer::HttpPeer; +use pingora_core::Result; +use pingora_load_balancing::{ + health_check, + selection::{LeastConnLease, LeastConnections}, + LoadBalancer, +}; +use pingora_proxy::{ProxyHttp, Session}; +use std::sync::Arc; +use std::time::Duration; + +// CTX object (with a lease) will be dropped at the end of the request +struct Ctx { + lease: Option, +} + +struct LB(Arc>); + +#[async_trait] +impl ProxyHttp for LB { + type CTX = Ctx; + + fn new_ctx(&self) -> Self::CTX { + Ctx { lease: None } + } + + async fn upstream_peer( + &self, + _session: &mut Session, + ctx: &mut Self::CTX, + ) -> Result> { + let (upstream, lease) = self.0.select_with_lease(b"", 256).unwrap(); + + // we need to store this lease until the end of the request + ctx.lease = Some(lease); + + info!("upstream peer is: {:?}", upstream); + + Ok(Box::new(HttpPeer::new( + upstream, + true, + "one.one.one.one".to_string(), + ))) + } + + async fn upstream_request_filter( + &self, + _session: &mut Session, + upstream_request: &mut pingora_http::RequestHeader, + _ctx: &mut Self::CTX, + ) -> Result<()> { + upstream_request + .insert_header("Host", "one.one.one.one") + .unwrap(); + Ok(()) + } +} + +// RUST_LOG=INFO cargo run --example least_connections +fn main() { + env_logger::init(); + + let opt = Opt::parse_args(); + let mut my_server = Server::new(Some(opt)).unwrap(); + my_server.bootstrap(); + + let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap(); + + let hc = health_check::TcpHealthCheck::new(); + upstreams.set_health_check(hc); + upstreams.health_check_frequency = Some(Duration::from_secs(1)); + + let background = background_service("least connections health check", upstreams); + let upstreams = background.task(); + + let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams)); + lb.add_tcp("0.0.0.0:6190"); + + let cert_path = format!("{}/tests/keys/server.crt", env!("CARGO_MANIFEST_DIR")); + let key_path = format!("{}/tests/keys/key.pem", env!("CARGO_MANIFEST_DIR")); + let mut tls_settings = + pingora_core::listeners::tls::TlsSettings::intermediate(&cert_path, &key_path).unwrap(); + tls_settings.enable_h2(); + lb.add_tls_with_settings("0.0.0.0:6191", None, tls_settings); + + my_server.add_service(lb); + my_server.add_service(background); + my_server.run_forever(); +}