Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.concurrent.ComplexCancellable;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
Expand Down Expand Up @@ -98,13 +99,25 @@ class AsyncCachingExec extends CachingExecBase implements AsyncExecChainHandler
private final HttpAsyncCache responseCache;
private final DefaultAsyncCacheRevalidator cacheRevalidator;
private final ConditionalRequestBuilder<HttpRequest> conditionalRequestBuilder;
private final boolean requestCollapsingEnabled;
private final CacheRequestCollapser collapser;

AsyncCachingExec(final HttpAsyncCache cache, final DefaultAsyncCacheRevalidator cacheRevalidator, final CacheConfig config) {
this(cache, cacheRevalidator, config, false);
}

AsyncCachingExec(
final HttpAsyncCache cache,
final DefaultAsyncCacheRevalidator cacheRevalidator,
final CacheConfig config,
final boolean requestCollapsingEnabled) {
super(config);
this.responseCache = Args.notNull(cache, "Response cache");
this.cacheRevalidator = cacheRevalidator;
this.conditionalRequestBuilder = new ConditionalRequestBuilder<>(request ->
BasicRequestBuilder.copy(request).build());
this.requestCollapsingEnabled = requestCollapsingEnabled;
this.collapser = requestCollapsingEnabled ? new CacheRequestCollapser() : null;
}

AsyncCachingExec(
Expand Down Expand Up @@ -274,6 +287,96 @@ public void completed(final CacheMatch result) {
final CacheHit hit = result != null ? result.hit : null;
final CacheHit root = result != null ? result.root : null;
if (hit == null) {
if (requestCollapsingEnabled && root == null && entityProducer == null && !requestCacheControl.isOnlyIfCached()) {
final String cacheKey = CacheKeyGenerator.INSTANCE.generateKey(target, request);
final CacheRequestCollapser.Token token = collapser.enter(cacheKey);
if (token.isLeader()) {
handleCacheMiss(requestCacheControl, null, target, request, null, scope, chain, new AsyncExecCallback() {

@Override
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
try {
return asyncExecCallback.handleResponse(response, entityDetails);
} catch (final HttpException | IOException ex) {
token.complete();
throw ex;
}
}

@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
try {
asyncExecCallback.handleInformationResponse(response);
} catch (final HttpException | IOException ex) {
token.complete();
throw ex;
}
}

@Override
public void completed() {
try {
asyncExecCallback.completed();
} finally {
token.complete();
}
}

@Override
public void failed(final Exception cause) {
try {
asyncExecCallback.failed(cause);
} finally {
token.complete();
}
}

});
} else {
// Stable holder owned by the follower: registered with the outer operation
// exactly once so the cache-lookup dependency installed by the await task
// is never overwritten from the outside.
final ComplexCancellable follower = new ComplexCancellable();
operation.setDependency(follower);
collapser.await(token, follower, () -> {
if (follower.isCancelled()) {
return;
}
follower.setDependency(responseCache.match(target, request, new FutureCallback<CacheMatch>() {

@Override
public void completed(final CacheMatch result) {
final CacheHit hit = result != null ? result.hit : null;
final CacheHit root = result != null ? result.root : null;
if (hit == null) {
handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback);
} else {
final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry);
if (LOG.isDebugEnabled()) {
LOG.debug("{} response cache control: {}", exchangeId, responseCacheControl);
}
context.setResponseCacheControl(responseCacheControl);
handleCacheHit(requestCacheControl, responseCacheControl, hit, target, request, entityProducer, scope, chain, asyncExecCallback);
}
}

@Override
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
}

@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}

}));
});
}
return;
}
handleCacheMiss(requestCacheControl, root, target, request, entityProducer, scope, chain, asyncExecCallback);
} else {
final ResponseCacheControl responseCacheControl = CacheControlHeaderParser.INSTANCE.parse(hit.entry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.cache;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.util.Args;

/**
* Coordinates concurrent requests for the same cache key so that only one request
* goes to the backend while others wait for it to complete and then re-check the cache.
* <p>
* Each {@link AsyncCachingExec} owns its own instance; collapse state is never shared
* across executors or clients.
*/
@Internal
final class CacheRequestCollapser {

static final class Token {

private final ConcurrentHashMap<String, Entry> inflight;
private final String key;
private final Entry entry;
private final boolean leader;

private Token(final ConcurrentHashMap<String, Entry> inflight, final String key, final Entry entry, final boolean leader) {
this.inflight = inflight;
this.key = key;
this.entry = entry;
this.leader = leader;
}

boolean isLeader() {
return leader;
}

void complete() {
if (entry.completed.compareAndSet(false, true)) {
inflight.remove(key, entry);
entry.drain();
}
}

}

private static final class Waiter implements Cancellable {

private final AtomicBoolean cancelled;
private final Runnable task;

private Waiter(final Runnable task) {
this.cancelled = new AtomicBoolean(false);
this.task = task;
}

@Override
public boolean cancel() {
return cancelled.compareAndSet(false, true);
}

void runIfNotCancelled() {
if (!cancelled.get()) {
task.run();
}
}

}

private static final class Entry {

private final AtomicBoolean completed;
private final ConcurrentLinkedQueue<Waiter> waiters;

private Entry() {
this.completed = new AtomicBoolean(false);
this.waiters = new ConcurrentLinkedQueue<>();
}

private void await(final CancellableDependency holder, final Runnable task) {
if (completed.get()) {
task.run();
return;
}
final Waiter waiter = new Waiter(task);
// Install the waiter into the holder before publishing it to the queue so
// a concurrent drain cannot run the task before cancellation is wired up.
holder.setDependency(waiter);
waiters.add(waiter);
if (completed.get()) {
drain();
}
}

private void drain() {
for (; ; ) {
final Waiter waiter = waiters.poll();
if (waiter == null) {
return;
}
waiter.runIfNotCancelled();
}
}

}

private final ConcurrentHashMap<String, Entry> inflight;

CacheRequestCollapser() {
this.inflight = new ConcurrentHashMap<>();
}

Token enter(final String key) {
Args.notEmpty(key, "Key");
final Entry created = new Entry();
final Entry existing = inflight.putIfAbsent(key, created);
if (existing == null) {
return new Token(inflight, key, created, true);
}
return new Token(inflight, key, existing, false);
}

void await(final Token token, final CancellableDependency holder, final Runnable task) {
Args.notNull(token, "Token");
Args.notNull(holder, "Holder");
Args.notNull(task, "Task");
token.entry.await(holder, task);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class CachingH2AsyncClientBuilder extends H2AsyncClientBuilder {
private SchedulingStrategy schedulingStrategy;
private CacheConfig cacheConfig;
private boolean deleteCache;
private boolean requestCollapsingEnabled;

public static CachingH2AsyncClientBuilder create() {
return new CachingH2AsyncClientBuilder();
Expand Down Expand Up @@ -112,6 +113,18 @@ public CachingH2AsyncClientBuilder setDeleteCache(final boolean deleteCache) {
return this;
}

/**
* Enables request collapsing for cacheable requests. When enabled, concurrent
* requests for the same cache key are coalesced so that only one request goes
* to the backend while the others wait and then re-check the cache.
*
* @since 5.7
*/
public CachingH2AsyncClientBuilder setRequestCollapsingEnabled(final boolean requestCollapsingEnabled) {
this.requestCollapsingEnabled = requestCollapsingEnabled;
return this;
}

@Override
protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler> execChainDefinition) {
final CacheConfig config = this.cacheConfig != null ? this.cacheConfig : CacheConfig.DEFAULT;
Expand Down Expand Up @@ -156,7 +169,8 @@ protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler>
final AsyncCachingExec cachingExec = new AsyncCachingExec(
httpCache,
cacheRevalidator,
config);
config,
requestCollapsingEnabled);
execChainDefinition.addBefore(ChainElement.PROTOCOL.name(), cachingExec, ChainElement.CACHING.name());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class CachingHttpAsyncClientBuilder extends HttpAsyncClientBuilder {
private SchedulingStrategy schedulingStrategy;
private CacheConfig cacheConfig;
private boolean deleteCache;
private boolean requestCollapsingEnabled;

public static CachingHttpAsyncClientBuilder create() {
return new CachingHttpAsyncClientBuilder();
Expand Down Expand Up @@ -116,6 +117,18 @@ public CachingHttpAsyncClientBuilder setDeleteCache(final boolean deleteCache) {
return this;
}

/**
* Enables request collapsing for cacheable requests. When enabled, concurrent
* requests for the same cache key are coalesced so that only one request goes
* to the backend while the others wait and then re-check the cache.
*
* @since 5.7
*/
public CachingHttpAsyncClientBuilder setRequestCollapsingEnabled(final boolean requestCollapsingEnabled) {
this.requestCollapsingEnabled = requestCollapsingEnabled;
return this;
}

@Override
protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler> execChainDefinition) {
final CacheConfig config = this.cacheConfig != null ? this.cacheConfig : CacheConfig.DEFAULT;
Expand Down Expand Up @@ -160,7 +173,8 @@ protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler>
final AsyncCachingExec cachingExec = new AsyncCachingExec(
httpCache,
cacheRevalidator,
config);
config,
requestCollapsingEnabled);
execChainDefinition.addBefore(ChainElement.PROTOCOL.name(), cachingExec, ChainElement.CACHING.name());
}

Expand Down
Loading
Loading