diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java index 8936863716..0cd66e497b 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/AsyncCachingExec.java @@ -58,6 +58,7 @@ import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.CancellableDependency; +import org.apache.hc.core5.concurrent.ComplexCancellable; import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.ContentType; @@ -98,13 +99,25 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler private final HttpAsyncCache responseCache; private final DefaultAsyncCacheRevalidator cacheRevalidator; private final ConditionalRequestBuilder conditionalRequestBuilder; + private final boolean requestCollapsingEnabled; + private final CacheRequestCollapser collapser; AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) { + this(cache, cacheRevalidator, config, false); + } + + AsyncCachingExec( + final HttpAsyncCache cache, + final DefaultAsyncCacheRevalidator cacheRevalidator, + final CacheConfig config, + final boolean requestCollapsingEnabled) { super(config); this.responseCache = Args.notNull(cache, "Response cache"); this.cacheRevalidator = cacheRevalidator; this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(request -> BasicRequestBuilder.copy(request).build()); + this.requestCollapsingEnabled = requestCollapsingEnabled; + this.collapser = requestCollapsingEnabled ? new CacheRequestCollapser() : null; } AsyncCachingExec( @@ -274,6 +287,96 @@ public void completed(final CacheMatch result) { final CacheHit hit = result != null ? result.hit : null; final CacheHit root = result != null ? result.root : null; if (hit == null) { + if (requestCollapsingEnabled && root == null && entityProducer == null && !requestCacheControl.isOnlyIfCached()) { + final String cacheKey = CacheKeyGenerator.INSTANCE.generateKey(target, request); + final CacheRequestCollapser.Token token = collapser.enter(cacheKey); + if (token.isLeader()) { + handleCacheMiss(requestCacheControl, null, target, request, null, scope, chain, new AsyncExecCallback() { + + @Override + public AsyncDataConsumer handleResponse( + final HttpResponse response, + final EntityDetails entityDetails) throws HttpException, IOException { + try { + return asyncExecCallback.handleResponse(response, entityDetails); + } catch (final HttpException | IOException ex) { + token.complete(); + throw ex; + } + } + + @Override + public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { + try { + asyncExecCallback.handleInformationResponse(response); + } catch (final HttpException | IOException ex) { + token.complete(); + throw ex; + } + } + + @Override + public void completed() { + try { + asyncExecCallback.completed(); + } finally { + token.complete(); + } + } + + @Override + public void failed(final Exception cause) { + try { + asyncExecCallback.failed(cause); + } finally { + token.complete(); + } + } + + }); + } else { + // Stable holder owned by the follower: registered with the outer operation + // exactly once so the cache-lookup dependency installed by the await task + // is never overwritten from the outside. + final ComplexCancellable follower = new ComplexCancellable(); + operation.setDependency(follower); + collapser.await(token, follower, () -> { + if (follower.isCancelled()) { + return; + } + follower.setDependency(responseCache.match(target, request, new FutureCallback() { + + @Override + public void completed(final CacheMatch result) { + final CacheHit hit = result != null ? result.hit : null; + final CacheHit root = result != null ? result.root : null; + if (hit == null) { + handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback); + } else { + final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry); + if (LOG.isDebugEnabled()) { + LOG.debug("{} response cache control: {}", exchangeId, responseCacheControl); + } + context.setResponseCacheControl(responseCacheControl); + handleCacheHit(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback); + } + } + + @Override + public void failed(final Exception cause) { + asyncExecCallback.failed(cause); + } + + @Override + public void cancelled() { + asyncExecCallback.failed(new InterruptedIOException()); + } + + })); + }); + } + return; + } handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback); } else { final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry); diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapser.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapser.java new file mode 100644 index 0000000000..6766d55630 --- /dev/null +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CacheRequestCollapser.java @@ -0,0 +1,158 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.CancellableDependency; +import org.apache.hc.core5.util.Args; + +/** + * Coordinates concurrent requests for the same cache key so that only one request + * goes to the backend while others wait for it to complete and then re-check the cache. + *

+ * Each {@link AsyncCachingExec} owns its own instance; collapse state is never shared + * across executors or clients. + */ +@Internal +final class CacheRequestCollapser { + + static final class Token { + + private final ConcurrentHashMap inflight; + private final String key; + private final Entry entry; + private final boolean leader; + + private Token(final ConcurrentHashMap inflight, final String key, final Entry entry, final boolean leader) { + this.inflight = inflight; + this.key = key; + this.entry = entry; + this.leader = leader; + } + + boolean isLeader() { + return leader; + } + + void complete() { + if (entry.completed.compareAndSet(false, true)) { + inflight.remove(key, entry); + entry.drain(); + } + } + + } + + private static final class Waiter implements Cancellable { + + private final AtomicBoolean cancelled; + private final Runnable task; + + private Waiter(final Runnable task) { + this.cancelled = new AtomicBoolean(false); + this.task = task; + } + + @Override + public boolean cancel() { + return cancelled.compareAndSet(false, true); + } + + void runIfNotCancelled() { + if (!cancelled.get()) { + task.run(); + } + } + + } + + private static final class Entry { + + private final AtomicBoolean completed; + private final ConcurrentLinkedQueue waiters; + + private Entry() { + this.completed = new AtomicBoolean(false); + this.waiters = new ConcurrentLinkedQueue<>(); + } + + private void await(final CancellableDependency holder, final Runnable task) { + if (completed.get()) { + task.run(); + return; + } + final Waiter waiter = new Waiter(task); + // Install the waiter into the holder before publishing it to the queue so + // a concurrent drain cannot run the task before cancellation is wired up. + holder.setDependency(waiter); + waiters.add(waiter); + if (completed.get()) { + drain(); + } + } + + private void drain() { + for (; ; ) { + final Waiter waiter = waiters.poll(); + if (waiter == null) { + return; + } + waiter.runIfNotCancelled(); + } + } + + } + + private final ConcurrentHashMap inflight; + + CacheRequestCollapser() { + this.inflight = new ConcurrentHashMap<>(); + } + + Token enter(final String key) { + Args.notEmpty(key, "Key"); + final Entry created = new Entry(); + final Entry existing = inflight.putIfAbsent(key, created); + if (existing == null) { + return new Token(inflight, key, created, true); + } + return new Token(inflight, key, existing, false); + } + + void await(final Token token, final CancellableDependency holder, final Runnable task) { + Args.notNull(token, "Token"); + Args.notNull(holder, "Holder"); + Args.notNull(task, "Task"); + token.entry.await(holder, task); + } + +} diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingH2AsyncClientBuilder.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingH2AsyncClientBuilder.java index 5cd9f33767..323fcd0a3d 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingH2AsyncClientBuilder.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingH2AsyncClientBuilder.java @@ -56,6 +56,7 @@ public class CachingH2AsyncClientBuilder extends H2AsyncClientBuilder { private SchedulingStrategy schedulingStrategy; private CacheConfig cacheConfig; private boolean deleteCache; + private boolean requestCollapsingEnabled; public static CachingH2AsyncClientBuilder create() { return new CachingH2AsyncClientBuilder(); @@ -112,6 +113,18 @@ public CachingH2AsyncClientBuilder setDeleteCache(final boolean deleteCache) { return this; } + /** + * Enables request collapsing for cacheable requests. When enabled, concurrent + * requests for the same cache key are coalesced so that only one request goes + * to the backend while the others wait and then re-check the cache. + * + * @since 5.7 + */ + public CachingH2AsyncClientBuilder setRequestCollapsingEnabled(final boolean requestCollapsingEnabled) { + this.requestCollapsingEnabled = requestCollapsingEnabled; + return this; + } + @Override protected void customizeExecChain(final NamedElementChain execChainDefinition) { final CacheConfig config = this.cacheConfig != null ? this.cacheConfig : CacheConfig.DEFAULT; @@ -156,7 +169,8 @@ protected void customizeExecChain(final NamedElementChain final AsyncCachingExec cachingExec = new AsyncCachingExec( httpCache, cacheRevalidator, - config); + config, + requestCollapsingEnabled); execChainDefinition.addBefore(ChainElement.PROTOCOL.name(), cachingExec, ChainElement.CACHING.name()); } diff --git a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingHttpAsyncClientBuilder.java b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingHttpAsyncClientBuilder.java index 2dda48a9a2..918697355c 100644 --- a/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingHttpAsyncClientBuilder.java +++ b/httpclient5-cache/src/main/java/org/apache/hc/client5/http/impl/cache/CachingHttpAsyncClientBuilder.java @@ -60,6 +60,7 @@ public class CachingHttpAsyncClientBuilder extends HttpAsyncClientBuilder { private SchedulingStrategy schedulingStrategy; private CacheConfig cacheConfig; private boolean deleteCache; + private boolean requestCollapsingEnabled; public static CachingHttpAsyncClientBuilder create() { return new CachingHttpAsyncClientBuilder(); @@ -116,6 +117,18 @@ public CachingHttpAsyncClientBuilder setDeleteCache(final boolean deleteCache) { return this; } + /** + * Enables request collapsing for cacheable requests. When enabled, concurrent + * requests for the same cache key are coalesced so that only one request goes + * to the backend while the others wait and then re-check the cache. + * + * @since 5.7 + */ + public CachingHttpAsyncClientBuilder setRequestCollapsingEnabled(final boolean requestCollapsingEnabled) { + this.requestCollapsingEnabled = requestCollapsingEnabled; + return this; + } + @Override protected void customizeExecChain(final NamedElementChain execChainDefinition) { final CacheConfig config = this.cacheConfig != null ? this.cacheConfig : CacheConfig.DEFAULT; @@ -160,7 +173,8 @@ protected void customizeExecChain(final NamedElementChain final AsyncCachingExec cachingExec = new AsyncCachingExec( httpCache, cacheRevalidator, - config); + config, + requestCollapsingEnabled); execChainDefinition.addBefore(ChainElement.PROTOCOL.name(), cachingExec, ChainElement.CACHING.name()); } diff --git a/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAsyncCachingExecRequestCollapsing.java b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAsyncCachingExecRequestCollapsing.java new file mode 100644 index 0000000000..da5bb757a3 --- /dev/null +++ b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestAsyncCachingExecRequestCollapsing.java @@ -0,0 +1,215 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.cache.CacheResponseStatus; +import org.apache.hc.client5.http.cache.HttpCacheContext; +import org.apache.hc.client5.http.cache.RequestCacheControl; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.utils.DateUtils; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.io.CloseMode; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestAsyncCachingExecRequestCollapsing { + + private static final class RoundResult { + private final int originHits; + private final int cacheMisses; + private final int cacheHits; + + private RoundResult(final int originHits, final int cacheMisses, final int cacheHits) { + this.originHits = originHits; + this.cacheMisses = cacheMisses; + this.cacheHits = cacheHits; + } + } + + @Test + void testRequestCollapsingPreventsThunderingHerdOnColdMiss() throws Exception { + final AtomicInteger originHits = new AtomicInteger(0); + + final HttpServer server = HttpServer.create(new InetSocketAddress(0), 0); + server.createContext("/", exchange -> handleOrigin(exchange, originHits)); + final ExecutorService executorService = Executors.newCachedThreadPool(); + server.setExecutor(executorService); + server.start(); + + try { + final int port = server.getAddress().getPort(); + final HttpHost target = new HttpHost("http", "localhost", port); + final int concurrent = 20; + + originHits.set(0); + final RoundResult baseline = runRound(target, concurrent, false, originHits); + Assertions.assertEquals(concurrent, baseline.originHits, "Baseline must hit origin N times"); + Assertions.assertEquals(concurrent, baseline.cacheMisses, "Baseline must be all CACHE_MISS on cold miss"); + Assertions.assertEquals(0, baseline.cacheHits, "Baseline must have no CACHE_HIT on cold miss"); + + originHits.set(0); + final RoundResult collapsed = runRound(target, concurrent, true, originHits); + Assertions.assertEquals(1, collapsed.originHits, "Collapsing must allow only one origin request"); + Assertions.assertEquals(1, collapsed.cacheMisses, "Collapsing must have exactly one CACHE_MISS leader"); + Assertions.assertEquals(concurrent - 1, collapsed.cacheHits, "Collapsing must serve followers from cache"); + } finally { + server.stop(0); + executorService.shutdownNow(); + } + } + + private static void handleOrigin(final HttpExchange exchange, final AtomicInteger originHits) throws IOException { + originHits.incrementAndGet(); + + // Keep the origin "busy" so concurrent client requests overlap and all see a cold cache. + try { + Thread.sleep(250); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + final byte[] body = "OK".getBytes(StandardCharsets.US_ASCII); + + exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=us-ascii"); + exchange.getResponseHeaders().add("Cache-Control", "public, max-age=60"); + exchange.getResponseHeaders().add("Date", DateUtils.formatStandardDate(Instant.now())); + + exchange.sendResponseHeaders(200, body.length); + try (final OutputStream out = exchange.getResponseBody()) { + out.write(body); + } + } + + private static RoundResult runRound( + final HttpHost target, + final int concurrent, + final boolean requestCollapsingEnabled, + final AtomicInteger originHits) throws Exception { + + final CacheConfig cacheConfig = CacheConfig.custom() + .setHeuristicCachingEnabled(false) + .build(); + + final AtomicInteger cacheMisses = new AtomicInteger(0); + final AtomicInteger cacheHits = new AtomicInteger(0); + + final CloseableHttpAsyncClient client = CachingHttpAsyncClients.custom() + .setCacheConfig(cacheConfig) + .setResourceFactory(HeapResourceFactory.INSTANCE) + .setRequestCollapsingEnabled(requestCollapsingEnabled) + .build(); + + client.start(); + + try { + final List> futures = new ArrayList<>(concurrent); + final CountDownLatch done = new CountDownLatch(concurrent); + final AtomicInteger failures = new AtomicInteger(0); + + for (int i = 0; i < concurrent; i++) { + final SimpleHttpRequest request = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/") + .build(); + + // Use one context per request. + final HttpCacheContext context = HttpCacheContext.create(); + context.setRequestCacheControl(RequestCacheControl.DEFAULT); + + futures.add(client.execute( + SimpleRequestProducer.create(request), + SimpleResponseConsumer.create(), + context, + new FutureCallback() { + + @Override + public void completed(final SimpleHttpResponse result) { + final CacheResponseStatus status = context.getCacheResponseStatus(); + if (status == CacheResponseStatus.CACHE_MISS) { + cacheMisses.incrementAndGet(); + } else if (status == CacheResponseStatus.CACHE_HIT) { + cacheHits.incrementAndGet(); + } else { + // For this test we only expect HIT or MISS. + failures.incrementAndGet(); + } + done.countDown(); + } + + @Override + public void failed(final Exception ex) { + failures.incrementAndGet(); + done.countDown(); + } + + @Override + public void cancelled() { + failures.incrementAndGet(); + done.countDown(); + } + + })); + } + + Assertions.assertTrue(done.await(30, TimeUnit.SECONDS), "Requests did not complete in time"); + Assertions.assertEquals(0, failures.get(), "Unexpected failures / cache statuses"); + + // Also ensure futures are all done / propagate any hidden exception. + for (final Future f : futures) { + f.get(5, TimeUnit.SECONDS); + } + + return new RoundResult(originHits.get(), cacheMisses.get(), cacheHits.get()); + } finally { + client.close(CloseMode.GRACEFUL); + } + } + +} diff --git a/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestCacheRequestCollapser.java b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestCacheRequestCollapser.java new file mode 100644 index 0000000000..db4769ab6b --- /dev/null +++ b/httpclient5-cache/src/test/java/org/apache/hc/client5/http/impl/cache/TestCacheRequestCollapser.java @@ -0,0 +1,222 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.ComplexCancellable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestCacheRequestCollapser { + + @Test + void testSingleLeaderForSameKey() throws Exception { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + final int threads = 32; + + final ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + final CountDownLatch ready = new CountDownLatch(threads); + final CountDownLatch start = new CountDownLatch(1); + + final AtomicInteger leaders = new AtomicInteger(0); + final List> futures = new ArrayList<>(threads); + + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(() -> { + ready.countDown(); + start.await(5, TimeUnit.SECONDS); + final CacheRequestCollapser.Token token = collapser.enter(key); + if (token.isLeader()) { + leaders.incrementAndGet(); + } + return token; + })); + } + + Assertions.assertTrue(ready.await(5, TimeUnit.SECONDS)); + start.countDown(); + + final List tokens = new ArrayList<>(threads); + for (final Future f : futures) { + tokens.add(f.get(5, TimeUnit.SECONDS)); + } + + Assertions.assertEquals(1, leaders.get(), "Expected exactly one leader"); + + CacheRequestCollapser.Token leaderToken = null; + for (final CacheRequestCollapser.Token t : tokens) { + if (t.isLeader()) { + leaderToken = t; + break; + } + } + Assertions.assertNotNull(leaderToken); + leaderToken.complete(); + + // After completion, the next enter must produce a fresh leader. + final CacheRequestCollapser.Token next = collapser.enter(key); + Assertions.assertTrue(next.isLeader()); + next.complete(); + } finally { + executor.shutdownNow(); + } + } + + @Test + void testAwaitRunsAfterComplete() throws Exception { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + Assertions.assertTrue(leader.isLeader()); + + final CacheRequestCollapser.Token follower = collapser.enter(key); + Assertions.assertFalse(follower.isLeader()); + + final AtomicInteger runs = new AtomicInteger(0); + final ComplexCancellable holder = new ComplexCancellable(); + collapser.await(follower, holder, runs::incrementAndGet); + Assertions.assertEquals(0, runs.get()); + + leader.complete(); + + // drain() runs synchronously inside complete(), so the task has already executed. + Assertions.assertEquals(1, runs.get()); + } + + @Test + void testCancelledWaiterDoesNotRun() { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + final CacheRequestCollapser.Token follower = collapser.enter(key); + + final AtomicInteger runs = new AtomicInteger(0); + final ComplexCancellable holder = new ComplexCancellable(); + collapser.await(follower, holder, runs::incrementAndGet); + Assertions.assertTrue(holder.cancel()); + + leader.complete(); + + Assertions.assertEquals(0, runs.get(), "Cancelled waiter must not run"); + } + + @Test + void testCompleteIsIdempotent() { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + final CacheRequestCollapser.Token follower = collapser.enter(key); + + final AtomicInteger runs = new AtomicInteger(0); + final ComplexCancellable holder = new ComplexCancellable(); + collapser.await(follower, holder, runs::incrementAndGet); + + leader.complete(); + leader.complete(); + leader.complete(); + + Assertions.assertEquals(1, runs.get(), "Waiters must run exactly once"); + } + + @Test + void testLeaderCompletesBetweenEnterAndAwait() { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + final CacheRequestCollapser.Token follower = collapser.enter(key); + Assertions.assertFalse(follower.isLeader()); + + leader.complete(); + + final AtomicInteger runs = new AtomicInteger(0); + final ComplexCancellable holder = new ComplexCancellable(); + collapser.await(follower, holder, runs::incrementAndGet); + + Assertions.assertEquals(1, runs.get(), "Task must run synchronously when the leader is already complete"); + } + + @Test + void testFollowerCancelWhileWaiting() { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + final CacheRequestCollapser.Token follower = collapser.enter(key); + + final AtomicInteger runs = new AtomicInteger(0); + final ComplexCancellable holder = new ComplexCancellable(); + collapser.await(follower, holder, runs::incrementAndGet); + + Assertions.assertTrue(holder.cancel(), "Outer cancel via the holder must succeed"); + Assertions.assertTrue(holder.isCancelled()); + + leader.complete(); + + Assertions.assertEquals(0, runs.get(), "Task must not run after the holder has been cancelled"); + } + + @Test + void testLeaderFailureReleasesFollowers() { + final CacheRequestCollapser collapser = new CacheRequestCollapser(); + final String key = "k"; + + final CacheRequestCollapser.Token leader = collapser.enter(key); + final CacheRequestCollapser.Token f1 = collapser.enter(key); + final CacheRequestCollapser.Token f2 = collapser.enter(key); + final CacheRequestCollapser.Token f3 = collapser.enter(key); + + final AtomicInteger runs = new AtomicInteger(0); + collapser.await(f1, new ComplexCancellable(), runs::incrementAndGet); + collapser.await(f2, new ComplexCancellable(), runs::incrementAndGet); + collapser.await(f3, new ComplexCancellable(), runs::incrementAndGet); + + // Simulate the leader failure path: callback.failed() ends with token.complete(). + leader.complete(); + + Assertions.assertEquals(3, runs.get(), "All followers must be released when the leader completes (success or failure)"); + + // After release the key is free and the next caller becomes a fresh leader. + final CacheRequestCollapser.Token next = collapser.enter(key); + Assertions.assertTrue(next.isLeader()); + next.complete(); + } + +}