diff --git a/.github/workflows/unmanaged_dependency_check.yaml b/.github/workflows/unmanaged_dependency_check.yaml
index 512d306a51..b484ee0606 100644
--- a/.github/workflows/unmanaged_dependency_check.yaml
+++ b/.github/workflows/unmanaged_dependency_check.yaml
@@ -14,6 +14,6 @@ jobs:
shell: bash
run: .kokoro/build.sh
- name: Unmanaged dependency check
- uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.52.1
+ uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.52.2
with:
bom-path: google-cloud-bigtable-bom/pom.xml
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4ac852cab2..61b22d277b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,24 @@
# Changelog
+## [2.67.0](https://github.com/googleapis/java-bigtable/compare/v2.66.0...v2.67.0) (2025-09-24)
+
+
+### Features
+
+* Idle channel eviction ([#2651](https://github.com/googleapis/java-bigtable/issues/2651)) ([70c05c9](https://github.com/googleapis/java-bigtable/commit/70c05c9c09a63c53818384d2a66c622c9b95e00e))
+* Load balancing options for BigtableChannelPool ([#2667](https://github.com/googleapis/java-bigtable/issues/2667)) ([5adaa84](https://github.com/googleapis/java-bigtable/commit/5adaa84d80df08779da7c36a50de4632049cfe96))
+
+
+### Bug Fixes
+
+* Add missing break; to PROTO and ENUM value type check ([#2672](https://github.com/googleapis/java-bigtable/issues/2672)) ([337e432](https://github.com/googleapis/java-bigtable/commit/337e4325f6cb5d11309ec5f33550d47d97cbe3c3))
+* Remove beta api annotation for query paginator ([#2660](https://github.com/googleapis/java-bigtable/issues/2660)) ([f68a1fa](https://github.com/googleapis/java-bigtable/commit/f68a1fae49b701d1fb9942e2af2fa84a1e5b508a))
+
+
+### Dependencies
+
+* Update shared dependencies ([#2679](https://github.com/googleapis/java-bigtable/issues/2679)) ([a5b8260](https://github.com/googleapis/java-bigtable/commit/a5b82609c365ae4792ed822e59039c1a046ef3ff))
+
## [2.66.0](https://github.com/googleapis/java-bigtable/compare/v2.65.1...v2.66.0) (2025-09-10)
diff --git a/README.md b/README.md
index 092b215ec3..4732358aba 100644
--- a/README.md
+++ b/README.md
@@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigtable'
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:google-cloud-bigtable:2.66.0'
+implementation 'com.google.cloud:google-cloud-bigtable:2.67.0'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.66.0"
+libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.67.0"
```
## Authentication
@@ -470,7 +470,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigtable/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigtable.svg
-[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigtable/2.66.0
+[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigtable/2.67.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
diff --git a/google-cloud-bigtable-bom/pom.xml b/google-cloud-bigtable-bom/pom.xml
index 74c550330d..8325a15a4f 100644
--- a/google-cloud-bigtable-bom/pom.xml
+++ b/google-cloud-bigtable-bom/pom.xml
@@ -3,12 +3,12 @@
4.0.0
com.google.cloud
google-cloud-bigtable-bom
- 2.66.0
+ 2.67.0
pom
com.google.cloud
sdk-platform-java-config
- 3.52.1
+ 3.52.2
@@ -63,37 +63,37 @@
com.google.cloud
google-cloud-bigtable
- 2.66.0
+ 2.67.0
com.google.cloud
google-cloud-bigtable-emulator
- 0.203.0
+ 0.204.0
com.google.cloud
google-cloud-bigtable-emulator-core
- 0.203.0
+ 0.204.0
com.google.api.grpc
grpc-google-cloud-bigtable-admin-v2
- 2.66.0
+ 2.67.0
com.google.api.grpc
grpc-google-cloud-bigtable-v2
- 2.66.0
+ 2.67.0
com.google.api.grpc
proto-google-cloud-bigtable-admin-v2
- 2.66.0
+ 2.67.0
com.google.api.grpc
proto-google-cloud-bigtable-v2
- 2.66.0
+ 2.67.0
diff --git a/google-cloud-bigtable-deps-bom/pom.xml b/google-cloud-bigtable-deps-bom/pom.xml
index eaf16d2455..2da2422484 100644
--- a/google-cloud-bigtable-deps-bom/pom.xml
+++ b/google-cloud-bigtable-deps-bom/pom.xml
@@ -7,13 +7,13 @@
com.google.cloud
sdk-platform-java-config
- 3.52.1
+ 3.52.2
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.66.0
+ 2.67.0
pom
Google Cloud Bigtable Dependency BOM
@@ -68,7 +68,7 @@
com.google.cloud
gapic-libraries-bom
- 1.67.0
+ 1.69.0
pom
import
diff --git a/google-cloud-bigtable-emulator-core/pom.xml b/google-cloud-bigtable-emulator-core/pom.xml
index 935dd1aa28..c8f05a4e33 100644
--- a/google-cloud-bigtable-emulator-core/pom.xml
+++ b/google-cloud-bigtable-emulator-core/pom.xml
@@ -7,12 +7,12 @@
google-cloud-bigtable-parent
com.google.cloud
- 2.66.0
+ 2.67.0
Google Cloud Java - Bigtable Emulator Core
google-cloud-bigtable-emulator-core
- 0.203.0
+ 0.204.0
A Java wrapper for the Cloud Bigtable emulator.
diff --git a/google-cloud-bigtable-emulator/pom.xml b/google-cloud-bigtable-emulator/pom.xml
index 45e4f70947..b6f558afab 100644
--- a/google-cloud-bigtable-emulator/pom.xml
+++ b/google-cloud-bigtable-emulator/pom.xml
@@ -5,7 +5,7 @@
4.0.0
google-cloud-bigtable-emulator
- 0.203.0
+ 0.204.0
Google Cloud Java - Bigtable Emulator
https://github.com/googleapis/java-bigtable
@@ -14,7 +14,7 @@
com.google.cloud
google-cloud-bigtable-parent
- 2.66.0
+ 2.67.0
scm:git:git@github.com:googleapis/java-bigtable.git
@@ -81,14 +81,14 @@
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.66.0
+ 2.67.0
pom
import
com.google.cloud
google-cloud-bigtable-bom
- 2.66.0
+ 2.67.0
pom
import
@@ -99,7 +99,7 @@
com.google.cloud
google-cloud-bigtable-emulator-core
- 0.203.0
+ 0.204.0
diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml
index 42ed3e3f7e..fcdb15efa7 100644
--- a/google-cloud-bigtable/clirr-ignored-differences.xml
+++ b/google-cloud-bigtable/clirr-ignored-differences.xml
@@ -349,6 +349,26 @@
com/google/cloud/bigtable/data/v2/models/sql/SqlType
*protoOf(*)
+
+ 7004
+ com/google/cloud/bigtable/common/Type$SchemalessProto
+ *
+
+
+ 7004
+ com/google/cloud/bigtable/common/Type$SchemalessEnum
+ *
+
+
+ 7013
+ com/google/cloud/bigtable/common/Type$SchemalessProto
+ *
+
+
+ 7013
+ com/google/cloud/bigtable/common/Type$SchemalessEnum
+ *
+
7012
@@ -426,4 +446,46 @@
*create*
*
+
+ 4001
+ com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer
+ com/google/api/gax/grpc/ChannelPrimer
+
+
+ 4001
+ com/google/cloud/bigtable/data/v2/stub/NoOpChannelPrimer
+ com/google/api/gax/grpc/ChannelPrimer
+
+
+ 7005
+ com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool
+ *create*
+ *
+
+
+
+ 7005
+ com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider
+ *create*
+ *
+
+
+
+ 7006
+ com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer
+ *sendPrimeRequestsAsync*
+ com.google.api.core.ApiFuture
+
+
+
+ 7013
+ com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolSettings
+ com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPoolSettings$LoadBalancingStrategy getLoadBalancingStrategy()
+
+
+
+ 7013
+ com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolSettings$Builder
+ com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPoolSettings$Builder setLoadBalancingStrategy(com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPoolSettings$LoadBalancingStrategy)
+
diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml
index 9a0aff1d3d..02797097f7 100644
--- a/google-cloud-bigtable/pom.xml
+++ b/google-cloud-bigtable/pom.xml
@@ -2,7 +2,7 @@
4.0.0
google-cloud-bigtable
- 2.66.0
+ 2.67.0
jar
Google Cloud Bigtable
https://github.com/googleapis/java-bigtable
@@ -12,11 +12,11 @@
com.google.cloud
google-cloud-bigtable-parent
- 2.66.0
+ 2.67.0
- 2.66.0
+ 2.67.0
google-cloud-bigtable
@@ -54,14 +54,14 @@
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.66.0
+ 2.67.0
pom
import
com.google.cloud
google-cloud-bigtable-bom
- 2.66.0
+ 2.67.0
pom
import
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/Version.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/Version.java
index 90014f7ab4..6d36a55a71 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/Version.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/Version.java
@@ -20,6 +20,6 @@
@InternalApi("For internal use only")
public final class Version {
// {x-version-update-start:google-cloud-bigtable:current}
- public static String VERSION = "2.66.0";
+ public static String VERSION = "2.67.0";
// {x-version-update-end}
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/common/Type.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/common/Type.java
index 29add532b2..35a11f8c5b 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/common/Type.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/common/Type.java
@@ -505,16 +505,19 @@ public int hashCode() {
abstract class SchemalessProto implements SqlType.Proto {
public static SchemalessProto fromProto(com.google.bigtable.v2.Type.Proto proto) {
- return create(proto.getMessageName());
+ return create(proto.getMessageName(), proto.getSchemaBundleId());
}
- public static SchemalessProto create(java.lang.String messageName) {
- return new AutoValue_Type_SchemalessProto(messageName);
+ public static SchemalessProto create(
+ java.lang.String messageName, java.lang.String schemaBundleId) {
+ return new AutoValue_Type_SchemalessProto(messageName, schemaBundleId);
}
@Override
public abstract java.lang.String getMessageName();
+ public abstract java.lang.String schemaBundleId();
+
@Override
public Parser getParserForType() {
throw new UnsupportedOperationException(
@@ -529,7 +532,12 @@ public Code getCode() {
@Override
public java.lang.String toString() {
- return getCode().name() + "{messageName=" + getMessageName() + "}";
+ return getCode().name()
+ + "{messageName="
+ + getMessageName()
+ + ", schemaBundleId="
+ + schemaBundleId()
+ + "}";
}
}
@@ -544,15 +552,18 @@ public java.lang.String toString() {
abstract class SchemalessEnum implements SqlType.Enum {
public static SchemalessEnum fromProto(com.google.bigtable.v2.Type.Enum proto) {
- return create(proto.getEnumName());
+ return create(proto.getEnumName(), proto.getSchemaBundleId());
}
- public static SchemalessEnum create(java.lang.String enumName) {
- return new AutoValue_Type_SchemalessEnum(enumName);
+ public static SchemalessEnum create(
+ java.lang.String enumName, java.lang.String schemaBundleId) {
+ return new AutoValue_Type_SchemalessEnum(enumName, schemaBundleId);
}
public abstract java.lang.String getEnumName();
+ public abstract java.lang.String schemaBundleId();
+
@Override
public Function getForNumber() {
throw new UnsupportedOperationException(
@@ -567,7 +578,12 @@ public Code getCode() {
@Override
public java.lang.String toString() {
- return getCode().name() + "{enumName=" + getEnumName() + "}";
+ return getCode().name()
+ + "{enumName="
+ + getEnumName()
+ + ", schemaBundleId="
+ + schemaBundleId()
+ + "}";
}
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java
index 3708c25def..2f579e2bd2 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java
@@ -366,7 +366,6 @@ private static ByteString wrapKey(String key) {
* A Query Paginator that will split a query into small chunks. See {@link
* Query#createPaginator(int)} for example usage.
*/
- @BetaApi("This surface is stable yet it might be removed in the future.")
public static class QueryPaginator {
private final boolean hasOverallLimit;
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/TableId.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/TableId.java
index 1b19e75d69..f743128212 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/TableId.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/TableId.java
@@ -31,7 +31,7 @@ public static TableId of(String tableId) {
return new AutoValue_TableId(tableId);
}
- abstract String getTableId();
+ public abstract String getTableId();
@Override
@InternalApi
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java
index 4ace6c7567..97c6e364c8 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java
@@ -15,14 +15,15 @@
*/
package com.google.cloud.bigtable.data.v2.stub;
+import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
-import com.google.api.gax.grpc.ChannelPrimer;
import com.google.auth.Credentials;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
+import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
@@ -110,8 +111,7 @@ private void sendPrimeRequestsBlocking(ManagedChannel managedChannel) {
}
}
- public SettableApiFuture sendPrimeRequestsAsync(
- ManagedChannel managedChannel) {
+ public ApiFuture sendPrimeRequestsAsync(ManagedChannel managedChannel) {
ClientCall clientCall =
managedChannel.newCall(
BigtableGrpc.getPingAndWarmMethod(),
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java
index 233294fe4e..92a984a015 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java
@@ -20,7 +20,6 @@
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
-import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ClientContext;
import com.google.auth.Credentials;
@@ -34,6 +33,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider;
import com.google.cloud.bigtable.gaxx.grpc.BigtableTransportChannelProvider;
+import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer;
import io.grpc.ManagedChannelBuilder;
import io.grpc.opentelemetry.GrpcOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/NoOpChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/NoOpChannelPrimer.java
index aed412fd0d..3cb98d9dee 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/NoOpChannelPrimer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/NoOpChannelPrimer.java
@@ -15,8 +15,11 @@
*/
package com.google.cloud.bigtable.data.v2.stub;
+import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
-import com.google.api.gax.grpc.ChannelPrimer;
+import com.google.api.core.SettableApiFuture;
+import com.google.bigtable.v2.PingAndWarmResponse;
+import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer;
import io.grpc.ManagedChannel;
@InternalApi
@@ -28,7 +31,14 @@ static NoOpChannelPrimer create() {
private NoOpChannelPrimer() {}
@Override
- public void primeChannel(ManagedChannel managedChannel) {
+ public void primeChannel(ManagedChannel channel) {
// No op
}
+
+ @Override
+ public ApiFuture sendPrimeRequestsAsync(ManagedChannel channel) {
+ SettableApiFuture future = SettableApiFuture.create();
+ future.set(PingAndWarmResponse.getDefaultInstance());
+ return future;
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/sql/ProtoRowsMergingStateMachine.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/sql/ProtoRowsMergingStateMachine.java
index 61788fc4b1..351656dcd4 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/sql/ProtoRowsMergingStateMachine.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/sql/ProtoRowsMergingStateMachine.java
@@ -199,9 +199,11 @@ static void validateValueAndType(SqlType> type, Value value) {
checkExpectedKind(value, Value.KindCase.STRING_VALUE, type);
break;
case BYTES:
+ case PROTO:
checkExpectedKind(value, Value.KindCase.BYTES_VALUE, type);
break;
case INT64:
+ case ENUM:
checkExpectedKind(value, Value.KindCase.INT_VALUE, type);
break;
case FLOAT64:
@@ -253,10 +255,6 @@ static void validateValueAndType(SqlType> type, Value value) {
mapType.getValueType(), mapElement.getArrayValue().getValuesList().get(1));
}
break;
- case PROTO:
- checkExpectedKind(value, Value.KindCase.BYTES_VALUE, type);
- case ENUM:
- checkExpectedKind(value, Value.KindCase.INT_VALUE, type);
default:
// This should be caught already at ResultSetMetadata creation
throw new IllegalStateException("Unrecognized type: " + type);
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java
index da7bd4f956..173722f2f4 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java
@@ -17,7 +17,7 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.ChannelFactory;
-import com.google.api.gax.grpc.ChannelPrimer;
+import com.google.cloud.bigtable.gaxx.grpc.ChannelPoolHealthChecker.ProbeResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -31,15 +31,19 @@
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -64,11 +68,13 @@ public class BigtableChannelPool extends ManagedChannel {
private final ChannelPrimer channelPrimer;
private final ScheduledExecutorService executor;
-
private final Object entryWriteLock = new Object();
@VisibleForTesting final AtomicReference> entries = new AtomicReference<>();
+ private final ChannelPoolHealthChecker channelPoolHealthChecker;
private final AtomicInteger indexTicker = new AtomicInteger();
private final String authority;
+ private final Random rng = new Random();
+ private final Supplier picker;
public static BigtableChannelPool create(
BigtableChannelPoolSettings settings,
@@ -96,6 +102,10 @@ public static BigtableChannelPool create(
this.settings = settings;
this.channelFactory = channelFactory;
this.channelPrimer = channelPrimer;
+ Clock systemClock = Clock.systemUTC();
+ this.channelPoolHealthChecker =
+ new ChannelPoolHealthChecker(entries::get, channelPrimer, executor, systemClock);
+ this.channelPoolHealthChecker.start();
ImmutableList.Builder initialListBuilder = ImmutableList.builder();
@@ -107,6 +117,23 @@ public static BigtableChannelPool create(
entries.set(initialListBuilder.build());
authority = entries.get().get(0).channel.authority();
+
+ switch (settings.getLoadBalancingStrategy()) {
+ case ROUND_ROBIN:
+ picker = this::pickEntryIndexRoundRobin;
+ break;
+ case LEAST_IN_FLIGHT:
+ picker = this::pickEntryIndexLeastInFlight;
+ break;
+ case POWER_OF_TWO_LEAST_IN_FLIGHT:
+ picker = this::pickEntryIndexPowerOfTwoLeastInFlight;
+ break;
+ default:
+ throw new IllegalStateException(
+ String.format(
+ "Unknown load balancing strategy %s", settings.getLoadBalancingStrategy()));
+ }
+
this.executor = executor;
if (!settings.isStaticSize()) {
@@ -132,19 +159,74 @@ public String authority() {
}
/**
- * Create a {@link ClientCall} on a Channel from the pool chosen in a round-robin fashion to the
- * remote operation specified by the given {@link MethodDescriptor}. The returned {@link
- * ClientCall} does not trigger any remote behavior until {@link
- * ClientCall#start(ClientCall.Listener, io.grpc.Metadata)} is invoked.
+ * Create a {@link ClientCall} on a Channel from the pool to the remote operation specified by the
+ * given {@link MethodDescriptor}. The returned {@link ClientCall} does not trigger any remote
+ * behavior until {@link ClientCall#start(ClientCall.Listener, io.grpc.Metadata)} is invoked.
*/
@Override
public ClientCall newCall(
MethodDescriptor methodDescriptor, CallOptions callOptions) {
- return getChannel(indexTicker.getAndIncrement()).newCall(methodDescriptor, callOptions);
+ return new AffinityChannel(pickEntryIndex()).newCall(methodDescriptor, callOptions);
+ }
+
+ /**
+ * Pick the index of an entry to use for the next call. The returned value *should* be within
+ * range, but callers should not assume that this is always the case as race conditions are
+ * possible.
+ */
+ private int pickEntryIndex() {
+ return picker.get();
+ }
+
+ /** Pick an entry using the Round Robin algorithm. */
+ private int pickEntryIndexRoundRobin() {
+ return Math.abs(indexTicker.getAndIncrement() % entries.get().size());
+ }
+
+ /** Pick an entry at random. */
+ private int pickEntryIndexRandom() {
+ return rng.nextInt(entries.get().size());
+ }
+
+ /** Pick an entry using the least-in-flight algorithm. */
+ private int pickEntryIndexLeastInFlight() {
+ List localEntries = entries.get();
+ int minRpcs = Integer.MAX_VALUE;
+ List candidates = new ArrayList<>();
+
+ for (int i = 0; i < localEntries.size(); i++) {
+ Entry entry = localEntries.get(i);
+ int rpcs = entry.outstandingRpcs.get();
+ if (rpcs < minRpcs) {
+ minRpcs = rpcs;
+ candidates.clear();
+ candidates.add(i);
+ } else if (rpcs == minRpcs) {
+ candidates.add(i);
+ }
+ }
+ // If there are multiple matching entries, pick one at random.
+ return candidates.get(rng.nextInt(candidates.size()));
}
- Channel getChannel(int affinity) {
- return new AffinityChannel(affinity);
+ /** Pick an entry using the power-of-two algorithm. */
+ private int pickEntryIndexPowerOfTwoLeastInFlight() {
+ List localEntries = entries.get();
+ int choice1 = pickEntryIndexRandom();
+ int choice2 = pickEntryIndexRandom();
+ if (choice1 == choice2) {
+ // Try to pick two different entries. If this picks the same entry again, it's likely that
+ // there's only one healthy channel in the pool and we should proceed anyway.
+ choice2 = pickEntryIndexRandom();
+ }
+
+ Entry entry1 = localEntries.get(choice1);
+ Entry entry2 = localEntries.get(choice2);
+ return entry1.outstandingRpcs.get() < entry2.outstandingRpcs.get() ? choice1 : choice2;
+ }
+
+ Channel getChannel(int index) {
+ return new AffinityChannel(index);
}
/** {@inheritDoc} */
@@ -389,7 +471,9 @@ void refresh() {
* Get and retain a Channel Entry. The returned Entry will have its rpc count incremented,
* preventing it from getting recycled.
*/
- Entry getRetainedEntry(int affinity) {
+ private Entry getRetainedEntry(int affinity) {
+ // If an entry is not retainable, that usually means that it's about to be replaced and if we
+ // retry we should get a new useable entry.
// The maximum number of concurrent calls to this method for any given time span is at most 2,
// so the loop can actually be 2 times. But going for 5 times for a safety margin for potential
// code evolving
@@ -445,15 +529,32 @@ static class Entry {
private final AtomicInteger maxOutstanding = new AtomicInteger();
- // Flag that the channel should be closed once all of the outstanding RPC complete.
+ /** Queue storing the last 5 minutes of probe results */
+ @VisibleForTesting
+ final ConcurrentLinkedQueue probeHistory = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Keep both # of failed and # of successful probes so that we don't have to check size() on the
+ * ConcurrentLinkedQueue all the time
+ */
+ final AtomicInteger failedProbesInWindow = new AtomicInteger();
+
+ final AtomicInteger successfulProbesInWindow = new AtomicInteger();
+
+ // Flag that the channel should be closed once all the outstanding RPCs complete.
private final AtomicBoolean shutdownRequested = new AtomicBoolean();
// Flag that the channel has been closed.
private final AtomicBoolean shutdownInitiated = new AtomicBoolean();
- private Entry(ManagedChannel channel) {
+ @VisibleForTesting
+ Entry(ManagedChannel channel) {
this.channel = channel;
}
+ ManagedChannel getManagedChannel() {
+ return this.channel;
+ }
+
int getAndResetMaxOutstanding() {
return maxOutstanding.getAndSet(outstandingRpcs.get());
}
@@ -468,7 +569,7 @@ private boolean retain() {
// register desire to start RPC
int currentOutstanding = outstandingRpcs.incrementAndGet();
- // Rough book keeping
+ // Rough bookkeeping
int prevMax = maxOutstanding.get();
if (currentOutstanding > prevMax) {
maxOutstanding.incrementAndGet();
@@ -520,10 +621,10 @@ private void shutdown() {
/** Thin wrapper to ensure that new calls are properly reference counted. */
private class AffinityChannel extends Channel {
- private final int affinity;
+ private final int index;
- public AffinityChannel(int affinity) {
- this.affinity = affinity;
+ public AffinityChannel(int index) {
+ this.index = index;
}
@Override
@@ -534,9 +635,7 @@ public String authority() {
@Override
public ClientCall newCall(
MethodDescriptor methodDescriptor, CallOptions callOptions) {
-
- Entry entry = getRetainedEntry(affinity);
-
+ Entry entry = getRetainedEntry(index);
return new ReleasingClientCall<>(entry.channel.newCall(methodDescriptor, callOptions), entry);
}
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolSettings.java
index e94a7665e8..4ef21418ed 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolSettings.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolSettings.java
@@ -16,10 +16,14 @@
package com.google.cloud.bigtable.gaxx.grpc;
import com.google.api.core.BetaApi;
+import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.ChannelPoolSettings;
import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import java.time.Duration;
+import java.util.logging.Logger;
/**
* Settings to control {@link BigtableChannelPool} behavior.
@@ -41,12 +45,33 @@
@BetaApi("surface for channel pool sizing is not yet stable")
@AutoValue
public abstract class BigtableChannelPoolSettings {
+ @VisibleForTesting
+ static final Logger LOG = Logger.getLogger(BigtableChannelPoolSettings.class.getName());
+
/** How often to check and possibly resize the {@link BigtableChannelPool}. */
static final Duration RESIZE_INTERVAL = Duration.ofMinutes(1);
/** The maximum number of channels that can be added or removed at a time. */
static final int MAX_RESIZE_DELTA = 2;
+ /** Environment variable used to set load balancing strategy. */
+ private static final String CBT_LOAD_BALANCING_STRATEGY_ENV_VAR = "CBT_LOAD_BALANCING_STRATEGY";
+
+ /** Load balancing strategy to use if environment variable is unset or invalid. */
+ private static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY =
+ LoadBalancingStrategy.ROUND_ROBIN;
+
+ /** Supported load-balancing strategies. */
+ public enum LoadBalancingStrategy {
+ // Sequentially iterate across all channels.
+ ROUND_ROBIN,
+ // Pick the channel with the fewest in-flight requests. If multiple channels match, pick at
+ // random.
+ LEAST_IN_FLIGHT,
+ // Out of two random channels, pick the channel with the fewest in-flight requests.
+ POWER_OF_TWO_LEAST_IN_FLIGHT,
+ }
+
/**
* Threshold to start scaling down the channel pool.
*
@@ -95,6 +120,10 @@ public abstract class BigtableChannelPoolSettings {
*/
public abstract boolean isPreemptiveRefreshEnabled();
+ /** The load balancing strategy to use for distributing RPCs across channels. */
+ @InternalApi("Use CBT_LOAD_BALANCING_STRATEGY environment variable")
+ public abstract LoadBalancingStrategy getLoadBalancingStrategy();
+
/**
* Helper to check if the {@link BigtableChannelPool} implementation can skip dynamic size logic
*/
@@ -111,6 +140,24 @@ boolean isStaticSize() {
return false;
}
+ /**
+ * Use environment variable CBT_LOAD_BALANCING_STRATEGY to pick a load-balancing strategy.
+ *
+ * @return load-balancing strategy to use.
+ */
+ private static LoadBalancingStrategy loadBalancingStrategyFromEnv() {
+ String strategyString = System.getenv(CBT_LOAD_BALANCING_STRATEGY_ENV_VAR);
+ if (Strings.isNullOrEmpty(strategyString)) {
+ return DEFAULT_LOAD_BALANCING_STRATEGY;
+ }
+ try {
+ return LoadBalancingStrategy.valueOf(strategyString.trim().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalStateException(
+ String.format("Invalid load-balancing strategy %s", strategyString));
+ }
+ }
+
public abstract Builder toBuilder();
public static BigtableChannelPoolSettings copyFrom(ChannelPoolSettings externalSettings) {
@@ -121,6 +168,7 @@ public static BigtableChannelPoolSettings copyFrom(ChannelPoolSettings externalS
.setMaxChannelCount(externalSettings.getMaxChannelCount())
.setInitialChannelCount(externalSettings.getInitialChannelCount())
.setPreemptiveRefreshEnabled(externalSettings.isPreemptiveRefreshEnabled())
+ .setLoadBalancingStrategy(loadBalancingStrategyFromEnv())
.build();
}
@@ -131,6 +179,7 @@ public static BigtableChannelPoolSettings staticallySized(int size) {
.setMaxRpcsPerChannel(Integer.MAX_VALUE)
.setMinChannelCount(size)
.setMaxChannelCount(size)
+ .setLoadBalancingStrategy(loadBalancingStrategyFromEnv())
.build();
}
@@ -141,7 +190,8 @@ public static Builder builder() {
.setMaxChannelCount(200)
.setMinRpcsPerChannel(0)
.setMaxRpcsPerChannel(Integer.MAX_VALUE)
- .setPreemptiveRefreshEnabled(false);
+ .setPreemptiveRefreshEnabled(false)
+ .setLoadBalancingStrategy(loadBalancingStrategyFromEnv());
}
@AutoValue.Builder
@@ -158,6 +208,9 @@ public abstract static class Builder {
public abstract Builder setPreemptiveRefreshEnabled(boolean enabled);
+ @InternalApi("Use CBT_LOAD_BALANCING_STRATEGY environment variable")
+ public abstract Builder setLoadBalancingStrategy(LoadBalancingStrategy strategy);
+
abstract BigtableChannelPoolSettings autoBuild();
public BigtableChannelPoolSettings build() {
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java
index 3c4cf24bca..ba18994619 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java
@@ -18,7 +18,6 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.ChannelFactory;
import com.google.api.gax.grpc.ChannelPoolSettings;
-import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.TransportChannel;
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPoolHealthChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPoolHealthChecker.java
new file mode 100644
index 0000000000..5c5c689810
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPoolHealthChecker.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.gaxx.grpc;
+
+import com.google.api.core.ApiFuture;
+import com.google.auto.value.AutoValue;
+import com.google.bigtable.v2.PingAndWarmResponse;
+import com.google.cloud.bigtable.data.v2.stub.BigtableChannelPrimer;
+import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPool.Entry;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+/** Class that manages the health checking in the BigtableChannelPool */
+class ChannelPoolHealthChecker {
+
+ private static final Logger logger = Logger.getLogger(ChannelPoolHealthChecker.class.getName());
+
+ // Configuration constants
+ // Window_Duration is the duration over which we keep probe results
+ private static final Duration WINDOW_DURATION = Duration.ofMinutes(5);
+ // Interval at which we probe channel health
+ private static final Duration PROBE_INTERVAL = Duration.ofSeconds(30);
+ // Timeout deadline for a probe
+ @VisibleForTesting static final Duration PROBE_DEADLINE = Duration.ofMillis(500);
+ // Minimum interval between new idle channel evictions
+ private static final Duration MIN_EVICTION_INTERVAL = Duration.ofMinutes(10);
+ // Minimum number of probes that must be sent to a channel before it will be considered for
+ // eviction
+ private static final int MIN_PROBES_FOR_EVALUATION = 4;
+ // Percentage of probes that must fail for a channel to be considered unhealthy
+ private static final int SINGLE_CHANNEL_FAILURE_PERCENT_THRESHOLD = 60;
+ // "Circuitbreaker" - If this or a higher percentage of channels in a pool are bad, we will not
+ // evict any channels
+ private static final int POOLWIDE_BAD_CHANNEL_CIRCUITBREAKER_PERCENT = 70;
+
+ /** Inner class to represent the result of a single probe. */
+ @AutoValue
+ abstract static class ProbeResult {
+ abstract Instant startTime();
+
+ abstract boolean isSuccessful();
+
+ static ProbeResult create(Instant startTime, boolean success) {
+ return new AutoValue_ChannelPoolHealthChecker_ProbeResult(startTime, success);
+ }
+ }
+
+ private final Supplier> entrySupplier;
+ private volatile Instant lastEviction;
+ private final ScheduledExecutorService executor;
+
+ private final ChannelPrimer channelPrimer;
+
+ private ScheduledFuture> probeTaskScheduledFuture;
+ private ScheduledFuture> detectAndRemoveTaskScheduledFuture;
+
+ private final Clock clock;
+
+ /** Constructor for the pool health checker. */
+ public ChannelPoolHealthChecker(
+ Supplier> entrySupplier,
+ ChannelPrimer channelPrimer,
+ ScheduledExecutorService executor,
+ Clock clock) {
+ this.entrySupplier = entrySupplier;
+ this.lastEviction = Instant.MIN;
+ this.channelPrimer = channelPrimer;
+ this.executor = executor;
+ this.clock = clock;
+ }
+
+ void start() {
+ if (!(channelPrimer instanceof BigtableChannelPrimer)) {
+ logger.log(
+ Level.WARNING,
+ "Provided channelPrimer not an instance of BigtableChannelPrimer, not checking channel"
+ + " health.");
+ return;
+ }
+
+ Duration initialDelayProbe =
+ Duration.ofMillis(ThreadLocalRandom.current().nextLong(PROBE_INTERVAL.toMillis()));
+ this.probeTaskScheduledFuture =
+ executor.scheduleAtFixedRate(
+ this::runProbes,
+ initialDelayProbe.toMillis(),
+ PROBE_INTERVAL.toMillis(),
+ TimeUnit.MILLISECONDS);
+ Duration initialDelayDetect =
+ Duration.ofMillis(ThreadLocalRandom.current().nextLong(PROBE_INTERVAL.toMillis()));
+ this.detectAndRemoveTaskScheduledFuture =
+ executor.scheduleAtFixedRate(
+ this::detectAndRemoveOutlierEntries,
+ initialDelayDetect.toMillis(),
+ PROBE_INTERVAL.toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ /** Stop running health checking */
+ public void stop() {
+ if (probeTaskScheduledFuture != null) {
+ probeTaskScheduledFuture.cancel(false);
+ }
+ if (detectAndRemoveTaskScheduledFuture != null) {
+ detectAndRemoveTaskScheduledFuture.cancel(false);
+ }
+ }
+
+ /** Runs probes on all the channels in the pool. */
+ @VisibleForTesting
+ void runProbes() {
+ Preconditions.checkState(
+ channelPrimer instanceof BigtableChannelPrimer,
+ "Health checking can only be enabled with BigtableChannelPrimer, found %s",
+ channelPrimer);
+ BigtableChannelPrimer primer = (BigtableChannelPrimer) channelPrimer;
+
+ for (Entry entry : this.entrySupplier.get()) {
+ ApiFuture probeFuture =
+ primer.sendPrimeRequestsAsync(entry.getManagedChannel());
+ probeFuture.addListener(
+ () -> onComplete(entry, clock.instant(), probeFuture), MoreExecutors.directExecutor());
+ }
+ }
+
+ /** Callback that will update Entry data on probe complete. */
+ @VisibleForTesting
+ void onComplete(Entry entry, Instant startTime, ApiFuture probeFuture) {
+ boolean success;
+ try {
+ probeFuture.get(PROBE_DEADLINE.toMillis(), TimeUnit.MILLISECONDS);
+ success = true;
+ } catch (Exception e) {
+ success = false;
+ logger.log(Level.WARNING, "Probe failed", e);
+ }
+ addProbeResult(entry, ProbeResult.create(startTime, success));
+ }
+
+ @VisibleForTesting
+ void addProbeResult(Entry entry, ProbeResult result) {
+ entry.probeHistory.add(result);
+ if (result.isSuccessful()) {
+ entry.successfulProbesInWindow.incrementAndGet();
+ } else {
+ entry.failedProbesInWindow.incrementAndGet();
+ }
+ pruneHistory(entry);
+ }
+
+ @VisibleForTesting
+ void pruneHistory(Entry entry) {
+ Instant windowStart = clock.instant().minus(WINDOW_DURATION);
+ while (!entry.probeHistory.isEmpty()
+ && entry.probeHistory.peek().startTime().isBefore(windowStart)) {
+ ProbeResult removedResult = entry.probeHistory.poll();
+ if (removedResult.isSuccessful()) {
+ entry.successfulProbesInWindow.decrementAndGet();
+ } else {
+ entry.failedProbesInWindow.decrementAndGet();
+ }
+ }
+ }
+
+ /** Checks if a single entry is currently healthy based on its probe history. */
+ @VisibleForTesting
+ boolean isEntryHealthy(Entry entry) {
+ int failedProbes = entry.failedProbesInWindow.get();
+ int totalProbes = failedProbes + entry.successfulProbesInWindow.get();
+
+ if (totalProbes < MIN_PROBES_FOR_EVALUATION) {
+ return true; // Not enough data, assume healthy.
+ }
+
+ double failureRate = ((double) failedProbes / totalProbes) * 100.0;
+ return failureRate < SINGLE_CHANNEL_FAILURE_PERCENT_THRESHOLD;
+ }
+
+ /**
+ * Finds a channel that is an outlier in terms of health.
+ *
+ * @return the entry to be evicted. Returns null if nothing to evict.
+ */
+ @Nullable
+ @VisibleForTesting
+ Entry findOutlierEntry() {
+ List unhealthyEntries =
+ this.entrySupplier.get().stream()
+ .filter(entry -> !isEntryHealthy(entry))
+ .collect(Collectors.toList());
+
+ int poolSize = this.entrySupplier.get().size();
+ if (unhealthyEntries.isEmpty() || poolSize == 0) {
+ return null;
+ }
+
+ // If more than CIRCUITBREAKER_PERCENT of channels are unhealthy we won't evict
+ double unhealthyPercent = (double) unhealthyEntries.size() / poolSize * 100.0;
+ if (unhealthyPercent >= POOLWIDE_BAD_CHANNEL_CIRCUITBREAKER_PERCENT) {
+ return null;
+ }
+
+ return unhealthyEntries.stream()
+ .max(Comparator.comparingInt(entry -> entry.failedProbesInWindow.get()))
+ .orElse(null);
+ }
+
+ /** Periodically detects and removes outlier channels from the pool. */
+ @VisibleForTesting
+ void detectAndRemoveOutlierEntries() {
+ if (clock.instant().isBefore(lastEviction.plus(MIN_EVICTION_INTERVAL))) {
+ // Primitive but effective rate-limiting.
+ return;
+ }
+ Entry outlier = findOutlierEntry();
+ if (outlier != null) {
+ this.lastEviction = clock.instant();
+ outlier.failedProbesInWindow.set(0);
+ outlier.successfulProbesInWindow.set(0);
+ outlier.probeHistory.clear();
+ outlier.getManagedChannel().enterIdle();
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java
new file mode 100644
index 0000000000..ea7cc70175
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.gaxx.grpc;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.InternalApi;
+import com.google.bigtable.v2.PingAndWarmResponse;
+import io.grpc.ManagedChannel;
+
+@InternalApi("For internal use by google-cloud-java clients only")
+public interface ChannelPrimer {
+ void primeChannel(ManagedChannel channel);
+
+ ApiFuture sendPrimeRequestsAsync(ManagedChannel channel);
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/common/TypeTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/common/TypeTest.java
index 0e103cac4b..d5c51451ca 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/common/TypeTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/common/TypeTest.java
@@ -57,9 +57,10 @@ public void simpleTypes_TypeToString() {
assertThat(Type.Timestamp.create().toString()).isEqualTo("TIMESTAMP");
assertThat(Type.Date.create().toString()).isEqualTo("DATE");
assertThat(Type.SchemalessStruct.create().toString()).isEqualTo("STRUCT");
- assertThat(Type.SchemalessProto.create("MyMessage").toString())
- .isEqualTo("PROTO{messageName=MyMessage}");
- assertThat(Type.SchemalessEnum.create("MyEnum").toString()).isEqualTo("ENUM{enumName=MyEnum}");
+ assertThat(Type.SchemalessProto.create("MyMessage", "my_bundle").toString())
+ .isEqualTo("PROTO{messageName=MyMessage, schemaBundleId=my_bundle}");
+ assertThat(Type.SchemalessEnum.create("MyEnum", "other_bundle").toString())
+ .isEqualTo("ENUM{enumName=MyEnum, schemaBundleId=other_bundle}");
}
@Test
@@ -123,37 +124,48 @@ public void map_equals() {
@Test
public void proto_equals() {
- assertThat(Type.SchemalessProto.create("MyMessage"))
- .isEqualTo(Type.SchemalessProto.create("MyMessage"));
+ assertThat(Type.SchemalessProto.create("MyMessage", "my_bundle"))
+ .isEqualTo(Type.SchemalessProto.create("MyMessage", "my_bundle"));
assertThat(Type.Proto.create(Singer.getDefaultInstance()))
.isEqualTo(Type.Proto.create(Singer.getDefaultInstance()));
- assertThat(Type.SchemalessProto.create("MyMessage"))
- .isNotEqualTo(Type.SchemalessProto.create("AnotherMessage"));
+ assertThat(Type.SchemalessProto.create("MyMessage", "my_bundle"))
+ .isNotEqualTo(Type.SchemalessProto.create("AnotherMessage", "my_bundle"));
+ assertThat(Type.SchemalessProto.create("MyMessage", "my_bundle"))
+ .isNotEqualTo(Type.SchemalessProto.create("MyMessage", "another_bundle"));
assertThat(Type.Proto.create(Singer.getDefaultInstance()))
.isNotEqualTo(Type.Proto.create(Album.getDefaultInstance()));
- assertThat(Type.SchemalessProto.create("com.google.cloud.bigtable.data.v2.test.Singer"))
+ assertThat(
+ Type.SchemalessProto.create(
+ "com.google.cloud.bigtable.data.v2.test.Singer", "my_bundle"))
.isNotEqualTo(Type.Proto.create(Singer.getDefaultInstance()));
assertThat(Type.Proto.create(Singer.getDefaultInstance()))
- .isNotEqualTo(Type.SchemalessProto.create("com.google.cloud.bigtable.data.v2.test.Singer"));
+ .isNotEqualTo(
+ Type.SchemalessProto.create(
+ "com.google.cloud.bigtable.data.v2.test.Singer", "my_bundle"));
}
@Test
public void enum_equals() {
- assertThat(Type.SchemalessEnum.create("MyEnum"))
- .isEqualTo(Type.SchemalessEnum.create("MyEnum"));
+ assertThat(Type.SchemalessEnum.create("MyEnum", "my_bundle"))
+ .isEqualTo(Type.SchemalessEnum.create("MyEnum", "my_bundle"));
assertThat(Type.Enum.create(Genre::forNumber)).isEqualTo(Type.Enum.create(Genre::forNumber));
- assertThat(Type.SchemalessEnum.create("MyEnum"))
- .isNotEqualTo(Type.SchemalessEnum.create("AnotherEnum"));
+ assertThat(Type.SchemalessEnum.create("MyEnum", "my_bundle"))
+ .isNotEqualTo(Type.SchemalessEnum.create("AnotherEnum", "my_bundle"));
+ assertThat(Type.SchemalessEnum.create("MyEnum", "my_bundle"))
+ .isNotEqualTo(Type.SchemalessEnum.create("MyEnum", "another_bundle"));
assertThat(Type.Enum.create(Genre::forNumber))
.isNotEqualTo(Type.Enum.create(Format::forNumber));
- assertThat(Type.SchemalessEnum.create("com.google.cloud.bigtable.data.v2.test.Genre"))
+ assertThat(
+ Type.SchemalessEnum.create("com.google.cloud.bigtable.data.v2.test.Genre", "my_bundle"))
.isNotEqualTo(Type.Enum.create(Genre::forNumber));
assertThat(Type.Enum.create(Genre::forNumber))
- .isNotEqualTo(Type.SchemalessEnum.create("com.google.cloud.bigtable.data.v2.test.Genre"));
+ .isNotEqualTo(
+ Type.SchemalessEnum.create(
+ "com.google.cloud.bigtable.data.v2.test.Genre", "my_bundle"));
}
@Test
@@ -230,13 +242,13 @@ public void schemalessStruct_throwsExceptionOnSchemaAccess() {
@Test
public void schemalessProto_throwsExceptionOnGetParser() {
- SchemalessProto proto = Type.SchemalessProto.create("MyMessage");
+ SchemalessProto proto = Type.SchemalessProto.create("MyMessage", "my_bundle");
assertThrows(UnsupportedOperationException.class, proto::getParserForType);
}
@Test
public void schemalessEnum_throwsExceptionOnGetForNumber() {
- SchemalessEnum myEnum = Type.SchemalessEnum.create("MyEnum");
+ SchemalessEnum myEnum = Type.SchemalessEnum.create("MyEnum", "my_bundle");
assertThrows(UnsupportedOperationException.class, myEnum::getForNumber);
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java
index 42746bbecc..c3d326fbef 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java
@@ -27,7 +27,6 @@
import com.google.api.gax.rpc.WatchdogProvider;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.FeatureFlags;
-import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
@@ -40,6 +39,7 @@
import com.google.common.base.Preconditions;
import com.google.common.io.BaseEncoding;
import io.grpc.Attributes;
+import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
@@ -50,9 +50,10 @@
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.lang.reflect.Method;
-import java.util.LinkedList;
-import java.util.List;
+import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import org.junit.After;
import org.junit.Before;
@@ -87,6 +88,7 @@ public class BigtableDataClientFactoryTest {
private final BlockingQueue setUpAttributes = new LinkedBlockingDeque<>();
private final BlockingQueue terminateAttributes = new LinkedBlockingDeque<>();
private final BlockingQueue requestMetadata = new LinkedBlockingDeque<>();
+ private final ConcurrentMap warmedChannels = new ConcurrentHashMap<>();
@Before
public void setUp() throws IOException {
@@ -101,6 +103,15 @@ public Listener interceptCall(
Metadata headers,
ServerCallHandler next) {
requestMetadata.add(headers);
+
+ // Check if the call is PingAndWarm and mark the channel address as warmed up.
+ if (BigtableGrpc.getPingAndWarmMethod().equals(call.getMethodDescriptor())) {
+ SocketAddress remoteAddr =
+ call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
+ if (remoteAddr != null) {
+ warmedChannels.put(remoteAddr, true);
+ }
+ }
return next.startCall(call, headers);
}
})
@@ -278,21 +289,8 @@ public void testCreateWithRefreshingChannel() throws Exception {
Mockito.verify(executorProvider, Mockito.times(1)).getExecutor();
Mockito.verify(watchdogProvider, Mockito.times(1)).getWatchdog();
- // Make sure that the clients are sharing the same ChannelPool
- assertThat(setUpAttributes).hasSize(poolSize);
-
- // Make sure that prime requests were sent only once per table per connection
- assertThat(service.pingAndWarmRequests).hasSize(poolSize);
- List expectedRequests = new LinkedList<>();
- for (int i = 0; i < poolSize; i++) {
- expectedRequests.add(
- PingAndWarmRequest.newBuilder()
- .setName(InstanceName.format(DEFAULT_PROJECT_ID, DEFAULT_INSTANCE_ID))
- .setAppProfileId(DEFAULT_APP_PROFILE_ID)
- .build());
- }
-
- assertThat(service.pingAndWarmRequests).containsExactly(expectedRequests.toArray());
+ assertThat(warmedChannels).hasSize(poolSize);
+ assertThat(warmedChannels.values()).doesNotContain(false);
// Wait for all the connections to close asynchronously
factory.close();
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/AbstractProtoStructReaderTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/AbstractProtoStructReaderTest.java
index 0c623dc18f..b34c0536bd 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/AbstractProtoStructReaderTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/AbstractProtoStructReaderTest.java
@@ -257,7 +257,8 @@ public void mapField_accessingProto() {
"testField",
mapType(
bytesType(),
- protoType("com.google.cloud.bigtable.data.v2.test.Singer"))))),
+ protoType(
+ "com.google.cloud.bigtable.data.v2.test.Singer", "my_bundle"))))),
Collections.singletonList(
mapValue(mapElement(bytesValue("key"), bytesValue(singer.toByteArray())))));
HashMap expectedMap = new HashMap<>();
@@ -280,7 +281,8 @@ public void mapField_accessingProto() {
"testField",
SqlType.mapOf(
SqlType.bytes(),
- SchemalessProto.create("com.google.cloud.bigtable.data.v2.test.Singer"))));
+ SchemalessProto.create(
+ "com.google.cloud.bigtable.data.v2.test.Singer", "my_bundle"))));
assertThrows(
UnsupportedOperationException.class,
() ->
@@ -288,7 +290,8 @@ public void mapField_accessingProto() {
0,
SqlType.mapOf(
SqlType.bytes(),
- SchemalessProto.create("com.google.cloud.bigtable.data.v2.test.Singer"))));
+ SchemalessProto.create(
+ "com.google.cloud.bigtable.data.v2.test.Singer", "my_bundle"))));
assertThrows(
IllegalStateException.class,
() ->
@@ -319,7 +322,8 @@ public void mapField_accessingEnum() {
"testField",
mapType(
bytesType(),
- enumType("com.google.cloud.bigtable.data.v2.test.Genre"))))),
+ enumType(
+ "com.google.cloud.bigtable.data.v2.test.Genre", "my_bundle"))))),
Collections.singletonList(mapValue(mapElement(bytesValue("key"), int64Value(0)))));
HashMap expectedMap = new HashMap<>();
expectedMap.put(ByteString.copyFromUtf8("key"), Genre.POP);
@@ -340,7 +344,8 @@ public void mapField_accessingEnum() {
"testField",
SqlType.mapOf(
SqlType.bytes(),
- SchemalessEnum.create("com.google.cloud.bigtable.data.v2.test.Genre"))));
+ SchemalessEnum.create(
+ "com.google.cloud.bigtable.data.v2.test.Genre", "my_bundle"))));
assertThrows(
UnsupportedOperationException.class,
() ->
@@ -348,7 +353,8 @@ public void mapField_accessingEnum() {
0,
SqlType.mapOf(
SqlType.bytes(),
- SchemalessEnum.create("com.google.cloud.bigtable.data.v2.test.Genre"))));
+ SchemalessEnum.create(
+ "com.google.cloud.bigtable.data.v2.test.Genre", "my_bundle"))));
assertThrows(
UnsupportedOperationException.class,
() ->
@@ -356,7 +362,8 @@ public void mapField_accessingEnum() {
"testField",
SqlType.mapOf(
SqlType.bytes(),
- SchemalessEnum.create("com.google.cloud.bigtable.data.v2.test.Genre"))));
+ SchemalessEnum.create(
+ "com.google.cloud.bigtable.data.v2.test.Genre", "my_bundle"))));
assertThrows(
UnsupportedOperationException.class,
() ->
@@ -364,7 +371,8 @@ public void mapField_accessingEnum() {
0,
SqlType.mapOf(
SqlType.bytes(),
- SchemalessEnum.create("com.google.cloud.bigtable.data.v2.test.Genre"))));
+ SchemalessEnum.create(
+ "com.google.cloud.bigtable.data.v2.test.Genre", "my_bundle"))));
assertThrows(
IllegalStateException.class,
() -> structWithMap.getMap("testField", SqlType.mapOf(SqlType.bytes(), SqlType.bytes())));
@@ -481,8 +489,8 @@ public static List