Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,27 @@ public interface ChannelEndpoint {
String getAddress();

/**
* Returns whether this server is ready to accept RPCs.
* Returns whether this server's channel is in {@code READY} state and can accept location-aware
* RPCs.
*
* <p>A server is considered unhealthy if:
* <p>Only endpoints in {@code READY} state are eligible for location-aware routing. Endpoints in
* {@code IDLE}, {@code CONNECTING}, {@code TRANSIENT_FAILURE}, or {@code SHUTDOWN} are not
* considered healthy for location-aware routing purposes.
*
* <ul>
* <li>The underlying channel is shutdown or terminated
* <li>The channel is in a transient failure state
* </ul>
*
* @return true if the server is healthy and ready to accept RPCs
* @return true if the channel is in READY state
*/
boolean isHealthy();

/**
* Returns whether this server's channel is in {@code TRANSIENT_FAILURE} state.
*
* <p>When an endpoint is in transient failure, it should be reported as a skipped tablet in
* routing hints so the server can refresh the client cache.
*
* @return true if the channel is in TRANSIENT_FAILURE state
*/
boolean isTransientFailure();

/**
* Returns the gRPC channel for making RPCs to this server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ public interface ChannelEndpointCache {
*/
ChannelEndpoint get(String address);

/**
* Returns a cached channel for the given address without creating it.
*
* <p>Unlike {@link #get(String)}, this method does not create a new endpoint if one does not
* already exist in the cache. This is used by location-aware routing to avoid foreground endpoint
* creation on the request path.
*
* @param address the server address in "host:port" format
* @return the cached channel instance, or null if no endpoint exists for this address
*/
@javax.annotation.Nullable
ChannelEndpoint getIfPresent(String address);

/**
* Evicts a server connection from the cache and gracefully shuts down its channel.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.Group;
import com.google.spanner.v1.Mutation;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RoutingHint;
import com.google.spanner.v1.Tablet;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;

/**
* Finds a server for a request using location-aware routing metadata.
Expand All @@ -44,9 +47,16 @@ public final class ChannelFinder {
private final AtomicLong databaseId = new AtomicLong();
private final KeyRecipeCache recipeCache = new KeyRecipeCache();
private final KeyRangeCache rangeCache;
@Nullable private final EndpointLifecycleManager lifecycleManager;

public ChannelFinder(ChannelEndpointCache endpointCache) {
this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache));
this(endpointCache, null);
}

public ChannelFinder(
ChannelEndpointCache endpointCache, @Nullable EndpointLifecycleManager lifecycleManager) {
this.rangeCache = new KeyRangeCache(Objects.requireNonNull(endpointCache), lifecycleManager);
this.lifecycleManager = lifecycleManager;
}

void useDeterministicRandom() {
Expand All @@ -67,6 +77,19 @@ public void update(CacheUpdate update) {
recipeCache.addRecipes(update.getKeyRecipes());
}
rangeCache.addRanges(update);

// Notify the lifecycle manager about server addresses so it can create endpoints
// in the background and start probing.
if (lifecycleManager != null) {
for (Group group : update.getGroupList()) {
for (Tablet tablet : group.getTabletsList()) {
String addr = tablet.getServerAddress();
if (!addr.isEmpty()) {
lifecycleManager.ensureEndpointExists(addr);
}
}
}
}
}
}

Expand Down
Loading
Loading