protos
}
/**
- * Deletes an schema bundle with the specified schema bundle ID in the specified table. Note that
- * the deletion is prohibited if the schema bundle has deletion_protection field set to true.
+ * Deletes an schema bundle with the specified schema bundle ID in the specified table.
*
* Sample code:
*
@@ -2106,8 +2105,7 @@ public void deleteSchemaBundle(String tableId, String schemaBundleId) {
/**
* Asynchronously deletes an schema bundle with the specified schema bundle ID in the specified
- * table. Note that the deletion is prohibited if the schema bundle has deletion_protection field
- * set to true.
+ * table.
*
*
Sample code:
*
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/CreateSchemaBundleRequest.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/CreateSchemaBundleRequest.java
index ea966d81c4..b6d88e2b53 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/CreateSchemaBundleRequest.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/CreateSchemaBundleRequest.java
@@ -68,8 +68,7 @@ public CreateSchemaBundleRequest setProtoSchemaFile(@Nonnull String protoSchemaF
}
/** Sets the proto schema for this schema bundle. */
- public CreateSchemaBundleRequest setProtoSchema(@Nonnull ByteString protoSchema)
- throws IOException {
+ public CreateSchemaBundleRequest setProtoSchema(@Nonnull ByteString protoSchema) {
Preconditions.checkNotNull(protoSchema, "protoSchema must be set");
requestBuilder.setSchemaBundle(
com.google.bigtable.admin.v2.SchemaBundle.newBuilder()
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java
index 92a984a015..bac1ec4a06 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java
@@ -27,9 +27,9 @@
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.internal.JwtCredentialsWithAudience;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants;
+import com.google.cloud.bigtable.data.v2.stub.metrics.ChannelPoolMetricsTracer;
import com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.DefaultMetricsProvider;
-import com.google.cloud.bigtable.data.v2.stub.metrics.ErrorCountPerConnectionMetricTracker;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider;
import com.google.cloud.bigtable.gaxx.grpc.BigtableTransportChannelProvider;
@@ -97,8 +97,7 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
: null;
@Nullable OpenTelemetrySdk internalOtel = null;
- @Nullable ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker = null;
-
+ @Nullable ChannelPoolMetricsTracer channelPoolMetricsTracer = null;
// Internal metrics are scoped to the connections, so we need a mutable transportProvider,
// otherwise there is
// no reason to build the internal OtelProvider
@@ -106,10 +105,9 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
internalOtel =
settings.getInternalMetricsProvider().createOtelProvider(settings, credentials);
if (internalOtel != null) {
- // Set up per connection error count tracker if all dependencies are met:
- // a configurable transport provider + otel
- errorCountPerConnectionMetricTracker =
- setupPerConnectionErrorTracer(builder, transportProvider, internalOtel);
+ channelPoolMetricsTracer =
+ new ChannelPoolMetricsTracer(
+ internalOtel, EnhancedBigtableStub.createBuiltinAttributes(builder.build()));
// Configure grpc metrics
configureGrpcOtel(transportProvider, internalOtel);
@@ -137,16 +135,16 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
BigtableTransportChannelProvider btTransportProvider =
BigtableTransportChannelProvider.create(
- (InstantiatingGrpcChannelProvider) transportProvider.build(), channelPrimer);
+ (InstantiatingGrpcChannelProvider) transportProvider.build(),
+ channelPrimer,
+ channelPoolMetricsTracer);
builder.setTransportChannelProvider(btTransportProvider);
}
ClientContext clientContext = ClientContext.create(builder.build());
-
- if (errorCountPerConnectionMetricTracker != null) {
- errorCountPerConnectionMetricTracker.startConnectionErrorCountTracker(
- clientContext.getExecutor());
+ if (channelPoolMetricsTracer != null) {
+ channelPoolMetricsTracer.start(clientContext.getExecutor());
}
return new BigtableClientContext(
@@ -264,27 +262,6 @@ private static void patchCredentials(EnhancedBigtableStubSettings.Builder settin
settings.setCredentialsProvider(FixedCredentialsProvider.create(patchedCreds));
}
- private static ErrorCountPerConnectionMetricTracker setupPerConnectionErrorTracer(
- EnhancedBigtableStubSettings.Builder builder,
- InstantiatingGrpcChannelProvider.Builder transportProvider,
- OpenTelemetry openTelemetry) {
- ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker =
- new ErrorCountPerConnectionMetricTracker(
- openTelemetry, EnhancedBigtableStub.createBuiltinAttributes(builder.build()));
- ApiFunction oldChannelConfigurator =
- transportProvider.getChannelConfigurator();
- transportProvider.setChannelConfigurator(
- managedChannelBuilder -> {
- managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor());
-
- if (oldChannelConfigurator != null) {
- managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
- }
- return managedChannelBuilder;
- });
- return errorCountPerConnectionMetricTracker;
- }
-
private static void setupCookieHolder(
InstantiatingGrpcChannelProvider.Builder transportProvider) {
ApiFunction oldChannelConfigurator =
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
index 31d6f76055..f4572333c9 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
@@ -64,6 +64,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -113,6 +114,10 @@ public class EnhancedBigtableStubSettings extends StubSettings TRANSPORT_ZONE = AttributeKey.stringKey("transport_zone");
static final AttributeKey TRANSPORT_SUBZONE = AttributeKey.stringKey("transport_subzone");
+ // gRPC attribute keys
+ // Note that these attributes keys from transformed from
+ // A.B.C to A_B_C before exporting to Cloud Monitoring.
+ static final AttributeKey GRPC_LB_BACKEND_SERVICE_KEY =
+ AttributeKey.stringKey("grpc.lb.backend_service");
+ static final AttributeKey GRPC_DISCONNECT_ERROR_KEY =
+ AttributeKey.stringKey("grpc.disconnect_error");
+ static final AttributeKey GRPC_LB_LOCALITY_KEY =
+ AttributeKey.stringKey("grpc.lb.locality");
+ static final AttributeKey GRPC_TARGET_KEY = AttributeKey.stringKey("grpc.target");
+ static final AttributeKey GRPC_SECURITY_LEVEL_KEY =
+ AttributeKey.stringKey("grpc.security_level");
+ static final AttributeKey GRPC_METHOD_KEY = AttributeKey.stringKey("grpc.method");
+ static final AttributeKey GRPC_STATUS_KEY = AttributeKey.stringKey("grpc.status");
+ static final AttributeKey GRPC_LB_RLS_DATA_PLANE_TARGET_KEY =
+ AttributeKey.stringKey("grpc.lb.rls.data_plane_target");
+ static final AttributeKey GRPC_LB_PICK_RESULT_KEY =
+ AttributeKey.stringKey("grpc.lb.pick_result");
+ static final AttributeKey GRPC_LB_RLS_SERVER_TARGET_KEY =
+ AttributeKey.stringKey("grpc.lb.rls.server_target");
+ static final AttributeKey GRPC_XDS_SERVER_KEY = AttributeKey.stringKey("grpc.xds.server");
+ static final AttributeKey GRPC_XDS_RESOURCE_TYPE_KEY =
+ AttributeKey.stringKey("grpc.xds.resource_type");
+
public static final String METER_NAME = "bigtable.googleapis.com/internal/client/";
// Metric names
@@ -70,37 +94,75 @@ public class BuiltinMetricsConstants {
static final String REMAINING_DEADLINE_NAME = "remaining_deadline";
static final String CLIENT_BLOCKING_LATENCIES_NAME = "throttling_latencies";
static final String PER_CONNECTION_ERROR_COUNT_NAME = "per_connection_error_count";
+ static final String OUTSTANDING_RPCS_PER_CHANNEL_NAME = "connection_pool/outstanding_rpcs";
// Start allow list of metrics that will be exported as internal
public static final Map> GRPC_METRICS =
ImmutableMap.>builder()
.put(
"grpc.client.attempt.duration",
- ImmutableSet.of("grpc.lb.locality", "grpc.method", "grpc.target", "grpc.status"))
+ ImmutableSet.of(
+ GRPC_LB_LOCALITY_KEY.getKey(),
+ GRPC_METHOD_KEY.getKey(),
+ GRPC_TARGET_KEY.getKey(),
+ GRPC_STATUS_KEY.getKey()))
.put(
"grpc.lb.rls.default_target_picks",
- ImmutableSet.of("grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"))
+ ImmutableSet.of(
+ GRPC_LB_RLS_DATA_PLANE_TARGET_KEY.getKey(), GRPC_LB_PICK_RESULT_KEY.getKey()))
.put(
"grpc.lb.rls.target_picks",
ImmutableSet.of(
- "grpc.target",
- "grpc.lb.rls.server_target",
- "grpc.lb.rls.data_plane_target",
- "grpc.lb.pick_result"))
+ GRPC_TARGET_KEY.getKey(),
+ GRPC_LB_RLS_SERVER_TARGET_KEY.getKey(),
+ GRPC_LB_RLS_DATA_PLANE_TARGET_KEY.getKey(),
+ GRPC_LB_PICK_RESULT_KEY.getKey()))
.put(
"grpc.lb.rls.failed_picks",
- ImmutableSet.of("grpc.target", "grpc.lb.rls.server_target"))
+ ImmutableSet.of(GRPC_TARGET_KEY.getKey(), GRPC_LB_RLS_SERVER_TARGET_KEY.getKey()))
// TODO: "grpc.xds_client.connected"
- .put("grpc.xds_client.server_failure", ImmutableSet.of("grpc.target", "grpc.xds.server"))
+ .put(
+ "grpc.xds_client.server_failure",
+ ImmutableSet.of(GRPC_TARGET_KEY.getKey(), GRPC_XDS_SERVER_KEY.getKey()))
// TODO: "grpc.xds_client.resource_updates_valid",
.put(
"grpc.xds_client.resource_updates_invalid",
- ImmutableSet.of("grpc.target", "grpc.xds.server", "grpc.xds.resource_type"))
+ ImmutableSet.of(
+ GRPC_TARGET_KEY.getKey(),
+ GRPC_XDS_SERVER_KEY.getKey(),
+ GRPC_XDS_RESOURCE_TYPE_KEY.getKey()))
// TODO: "grpc.xds_client.resources"
+ // gRPC subchannel metrics
+ .put(
+ "grpc.subchannel.disconnections",
+ ImmutableSet.of(
+ GRPC_LB_BACKEND_SERVICE_KEY.getKey(),
+ GRPC_DISCONNECT_ERROR_KEY.getKey(),
+ GRPC_LB_LOCALITY_KEY.getKey(),
+ GRPC_TARGET_KEY.getKey()))
+ .put(
+ "grpc.subchannel.connection_attempts_succeeded",
+ ImmutableSet.of(
+ GRPC_LB_BACKEND_SERVICE_KEY.getKey(),
+ GRPC_LB_LOCALITY_KEY.getKey(),
+ GRPC_TARGET_KEY.getKey()))
+ .put(
+ "grpc.subchannel.connection_attempts_failed",
+ ImmutableSet.of(
+ GRPC_LB_BACKEND_SERVICE_KEY.getKey(),
+ GRPC_LB_LOCALITY_KEY.getKey(),
+ GRPC_TARGET_KEY.getKey()))
+ .put(
+ "grpc.subchannel.open_connections",
+ ImmutableSet.of(
+ GRPC_LB_BACKEND_SERVICE_KEY.getKey(),
+ GRPC_LB_LOCALITY_KEY.getKey(),
+ GRPC_SECURITY_LEVEL_KEY.getKey(),
+ GRPC_TARGET_KEY.getKey()))
.build();
public static final Set INTERNAL_METRICS =
- ImmutableSet.of(PER_CONNECTION_ERROR_COUNT_NAME).stream()
+ ImmutableSet.of(PER_CONNECTION_ERROR_COUNT_NAME, OUTSTANDING_RPCS_PER_CHANNEL_NAME).stream()
.map(m -> METER_NAME + m)
.collect(ImmutableSet.toImmutableSet());
// End allow list of metrics that will be exported
@@ -140,6 +202,15 @@ public class BuiltinMetricsConstants {
500_000.0,
1_000_000.0));
+ // Buckets for outstanding RPCs per channel, max ~100
+ private static final Aggregation AGGREGATION_OUTSTANDING_RPCS_HISTOGRAM =
+ Aggregation.explicitBucketHistogram(
+ ImmutableList.of(
+ 0.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0, 50.0, 55.0, 60.0, 65.0,
+ 70.0, 75.0, 80.0, 85.0, 90.0, 95.0, 100.0, 105.0, 110.0, 115.0, 120.0, 125.0, 130.0,
+ 135.0, 140.0, 145.0, 150.0, 155.0, 160.0, 165.0, 170.0, 175.0, 180.0, 185.0, 190.0,
+ 195.0, 200.0));
+
static final Set COMMON_ATTRIBUTES =
ImmutableSet.of(
BIGTABLE_PROJECT_ID_KEY,
@@ -181,6 +252,7 @@ static void defineView(
viewMap.put(selector, view);
}
+ // uses cloud.BigtableClient schema
public static Map getInternalViews() {
ImmutableMap.Builder views = ImmutableMap.builder();
defineView(
@@ -192,6 +264,15 @@ public static Map getInternalViews() {
ImmutableSet.builder()
.add(BIGTABLE_PROJECT_ID_KEY, INSTANCE_ID_KEY, APP_PROFILE_KEY, CLIENT_NAME_KEY)
.build());
+ defineView(
+ views,
+ OUTSTANDING_RPCS_PER_CHANNEL_NAME,
+ AGGREGATION_OUTSTANDING_RPCS_HISTOGRAM,
+ InstrumentType.HISTOGRAM,
+ "1",
+ ImmutableSet.builder()
+ .add(BIGTABLE_PROJECT_ID_KEY, INSTANCE_ID_KEY, APP_PROFILE_KEY, CLIENT_NAME_KEY)
+ .build());
return views.build();
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ChannelPoolMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ChannelPoolMetricsTracer.java
new file mode 100644
index 0000000000..e0b55f3272
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ChannelPoolMetricsTracer.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METER_NAME;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.OUTSTANDING_RPCS_PER_CHANNEL_NAME;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelObserver;
+import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPoolObserver;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+
+@InternalApi("For internal use only")
+public class ChannelPoolMetricsTracer implements Runnable {
+ private static final Logger logger = Logger.getLogger(ChannelPoolMetricsTracer.class.getName());
+
+ private static final int SAMPLING_PERIOD_SECONDS = 60;
+ private final LongHistogram outstandingRpcsHistogram;
+ private final LongHistogram perConnectionErrorCountHistogram;
+
+ private final AtomicReference bigtableChannelInsightsProviderRef =
+ new AtomicReference<>();
+ private final AtomicReference lbPolicyRef = new AtomicReference<>("ROUND_ROBIN");
+ private final Attributes commonAttrs;
+
+ // Attributes for unary and streaming RPCs, built on demand in run()
+ @Nullable private Attributes unaryAttributes;
+ @Nullable private Attributes streamingAttributes;
+
+ public ChannelPoolMetricsTracer(OpenTelemetry openTelemetry, Attributes commonAttrs) {
+ Meter meter = openTelemetry.getMeter(METER_NAME);
+ this.commonAttrs = commonAttrs;
+ this.outstandingRpcsHistogram =
+ meter
+ .histogramBuilder(OUTSTANDING_RPCS_PER_CHANNEL_NAME)
+ .ofLongs()
+ .setDescription(
+ "A distribution of the number of outstanding RPCs per connection in the client"
+ + " pool, sampled periodically.")
+ .setUnit("1")
+ .build();
+
+ this.perConnectionErrorCountHistogram =
+ meter
+ .histogramBuilder(PER_CONNECTION_ERROR_COUNT_NAME)
+ .ofLongs()
+ .setDescription("Distribution of counts of channels per 'error count per minute'.")
+ .setUnit("1")
+ .build();
+ }
+
+ /**
+ * Registers the provider for the channel pool entries. This should be called by the component
+ * that creates the BigtableChannelPool.
+ */
+ public void registerChannelInsightsProvider(BigtableChannelPoolObserver channelInsightsProvider) {
+ this.bigtableChannelInsightsProviderRef.set(channelInsightsProvider);
+ }
+
+ /** Register the current lb policy * */
+ public void registerLoadBalancingStrategy(String lbPolicy) {
+ this.lbPolicyRef.set(lbPolicy);
+ }
+
+ /** Starts the periodic collection. */
+ public ScheduledFuture> start(ScheduledExecutorService scheduler) {
+ return scheduler.scheduleAtFixedRate(
+ this, SAMPLING_PERIOD_SECONDS, SAMPLING_PERIOD_SECONDS, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void run() {
+ BigtableChannelPoolObserver channelInsightsProvider = bigtableChannelInsightsProviderRef.get();
+ if (channelInsightsProvider == null) {
+ logger.warning("No Bigtable ChannelPoolObserver available");
+ return; // Not registered yet
+ }
+ String lbPolicy = lbPolicyRef.get();
+
+ // Build attributes if they haven't been built yet.
+ if (unaryAttributes == null || streamingAttributes == null) {
+ Attributes baseAttrs = commonAttrs.toBuilder().put("lb_policy", lbPolicy).build();
+ this.unaryAttributes = baseAttrs.toBuilder().put("streaming", false).build();
+ this.streamingAttributes = baseAttrs.toBuilder().put("streaming", true).build();
+ }
+ List extends BigtableChannelObserver> channelInsights =
+ channelInsightsProvider.getChannelInfos();
+ if (channelInsights == null || channelInsights.isEmpty()) {
+ return;
+ }
+ for (BigtableChannelObserver info : channelInsights) {
+ String transportTypeValue = info.isAltsChannel() ? "DIRECTPATH" : "CLOUDPATH";
+ this.unaryAttributes =
+ this.unaryAttributes.toBuilder().put("transport_type", transportTypeValue).build();
+ this.streamingAttributes =
+ this.streamingAttributes.toBuilder().put("transport_type", transportTypeValue).build();
+
+ long currentOutstandingUnaryRpcs = info.getOutstandingUnaryRpcs();
+ long currentOutstandingStreamingRpcs = info.getOutstandingStreamingRpcs();
+ // Record outstanding unary RPCs with streaming=false
+ outstandingRpcsHistogram.record(currentOutstandingUnaryRpcs, unaryAttributes);
+ // Record outstanding streaming RPCs with streaming=true
+ outstandingRpcsHistogram.record(currentOutstandingStreamingRpcs, streamingAttributes);
+
+ long errors = info.getAndResetErrorCount();
+ perConnectionErrorCountHistogram.record(errors, commonAttrs);
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ConnectionErrorCountInterceptor.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ConnectionErrorCountInterceptor.java
deleted file mode 100644
index 17fcf9018e..0000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ConnectionErrorCountInterceptor.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2024 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub.metrics;
-
-import io.grpc.CallOptions;
-import io.grpc.Channel;
-import io.grpc.ClientCall;
-import io.grpc.ClientInterceptor;
-import io.grpc.ForwardingClientCall;
-import io.grpc.ForwardingClientCallListener;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
-import io.grpc.Status;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/** An interceptor which counts the number of failed responses for a channel. */
-class ConnectionErrorCountInterceptor implements ClientInterceptor {
- private static final Logger LOG =
- Logger.getLogger(ConnectionErrorCountInterceptor.class.toString());
- private final LongAdder numOfErrors;
- private final LongAdder numOfSuccesses;
-
- ConnectionErrorCountInterceptor() {
- numOfErrors = new LongAdder();
- numOfSuccesses = new LongAdder();
- }
-
- @Override
- public ClientCall interceptCall(
- MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) {
- return new ForwardingClientCall.SimpleForwardingClientCall(
- channel.newCall(methodDescriptor, callOptions)) {
- @Override
- public void start(Listener responseListener, Metadata headers) {
- super.start(
- new ForwardingClientCallListener.SimpleForwardingClientCallListener(
- responseListener) {
- @Override
- public void onClose(Status status, Metadata trailers) {
- // Connection accounting is non-critical, so we log the exception, but let normal
- // processing proceed.
- try {
- handleOnCloseUnsafe(status);
- } catch (Throwable t) {
- if (t instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- LOG.log(
- Level.WARNING, "Unexpected error while updating connection error stats", t);
- }
- super.onClose(status, trailers);
- }
-
- private void handleOnCloseUnsafe(Status status) {
- if (status.isOk()) {
- numOfSuccesses.increment();
- } else {
- numOfErrors.increment();
- }
- }
- },
- headers);
- }
- };
- }
-
- long getAndResetNumOfErrors() {
- return numOfErrors.sumThenReset();
- }
-
- long getAndResetNumOfSuccesses() {
- return numOfSuccesses.sumThenReset();
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionMetricTracker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionMetricTracker.java
deleted file mode 100644
index a891df9509..0000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionMetricTracker.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2024 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub.metrics;
-
-import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METER_NAME;
-import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME;
-
-import com.google.api.core.InternalApi;
-import io.grpc.ClientInterceptor;
-import io.opentelemetry.api.OpenTelemetry;
-import io.opentelemetry.api.common.Attributes;
-import io.opentelemetry.api.metrics.LongHistogram;
-import io.opentelemetry.api.metrics.Meter;
-import java.util.Collections;
-import java.util.Set;
-import java.util.WeakHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/* Background task that goes through all connections and updates the errors_per_connection metric. */
-@InternalApi("For internal use only")
-public class ErrorCountPerConnectionMetricTracker implements Runnable {
-
- private static final Integer PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS = 60;
-
- private final LongHistogram perConnectionErrorCountHistogram;
- private final Attributes attributes;
-
- private final Set connectionErrorCountInterceptors;
- private final Object interceptorsLock = new Object();
-
- public ErrorCountPerConnectionMetricTracker(OpenTelemetry openTelemetry, Attributes attributes) {
- connectionErrorCountInterceptors =
- Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
-
- Meter meter = openTelemetry.getMeter(METER_NAME);
-
- perConnectionErrorCountHistogram =
- meter
- .histogramBuilder(PER_CONNECTION_ERROR_COUNT_NAME)
- .ofLongs()
- .setDescription("Distribution of counts of channels per 'error count per minute'.")
- .setUnit("1")
- .build();
-
- this.attributes = attributes;
- }
-
- public void startConnectionErrorCountTracker(ScheduledExecutorService scheduler) {
- scheduler.scheduleAtFixedRate(
- this, 0, PER_CONNECTION_ERROR_COUNT_PERIOD_SECONDS, TimeUnit.SECONDS);
- }
-
- public ClientInterceptor getInterceptor() {
- ConnectionErrorCountInterceptor connectionErrorCountInterceptor =
- new ConnectionErrorCountInterceptor();
- synchronized (interceptorsLock) {
- connectionErrorCountInterceptors.add(connectionErrorCountInterceptor);
- }
- return connectionErrorCountInterceptor;
- }
-
- @Override
- public void run() {
- synchronized (interceptorsLock) {
- for (ConnectionErrorCountInterceptor interceptor : connectionErrorCountInterceptors) {
- long errors = interceptor.getAndResetNumOfErrors();
- long successes = interceptor.getAndResetNumOfSuccesses();
- // We avoid keeping track of inactive connections (i.e., without any failed or successful
- // requests).
- if (errors > 0 || successes > 0) {
- // TODO: add a metric to also keep track of the number of successful requests per each
- // connection.
- perConnectionErrorCountHistogram.record(errors, attributes);
- }
- }
- }
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelObserver.java
new file mode 100644
index 0000000000..a718f5fa06
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelObserver.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.gaxx.grpc;
+
+import com.google.api.core.InternalApi;
+
+/** Provides observability about a single channel in the channel pool. */
+@InternalApi
+public interface BigtableChannelObserver {
+ /** Gets the current number of outstanding Unary RPCs on this channel. */
+ int getOutstandingUnaryRpcs();
+
+ /** Gets the current number of outstanding Streaming RPCs on this channel. */
+ int getOutstandingStreamingRpcs();
+
+ /** Get the current number of errors request count since the last observed period */
+ long getAndResetErrorCount();
+
+ /** Get the current number of successful requests since the last observed period */
+ long getAndResetSuccessCount();
+
+ boolean isAltsChannel();
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java
index 173722f2f4..9fcc0ee151 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java
@@ -30,6 +30,7 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
+import io.grpc.alts.AltsContextUtil;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
@@ -57,7 +58,7 @@
* Internal API
*/
@InternalApi
-public class BigtableChannelPool extends ManagedChannel {
+public class BigtableChannelPool extends ManagedChannel implements BigtableChannelPoolObserver {
@VisibleForTesting
static final Logger LOG = Logger.getLogger(BigtableChannelPool.class.getName());
@@ -196,7 +197,7 @@ private int pickEntryIndexLeastInFlight() {
for (int i = 0; i < localEntries.size(); i++) {
Entry entry = localEntries.get(i);
- int rpcs = entry.outstandingRpcs.get();
+ int rpcs = entry.totalOutstandingRpcs();
if (rpcs < minRpcs) {
minRpcs = rpcs;
candidates.clear();
@@ -222,7 +223,7 @@ private int pickEntryIndexPowerOfTwoLeastInFlight() {
Entry entry1 = localEntries.get(choice1);
Entry entry2 = localEntries.get(choice2);
- return entry1.outstandingRpcs.get() < entry2.outstandingRpcs.get() ? choice1 : choice2;
+ return entry1.totalOutstandingRpcs() < entry2.totalOutstandingRpcs() ? choice1 : choice2;
}
Channel getChannel(int index) {
@@ -471,7 +472,7 @@ void refresh() {
* Get and retain a Channel Entry. The returned Entry will have its rpc count incremented,
* preventing it from getting recycled.
*/
- private Entry getRetainedEntry(int affinity) {
+ private Entry getRetainedEntry(int affinity, boolean isStreaming) {
// If an entry is not retainable, that usually means that it's about to be replaced and if we
// retry we should get a new useable entry.
// The maximum number of concurrent calls to this method for any given time span is at most 2,
@@ -479,7 +480,7 @@ private Entry getRetainedEntry(int affinity) {
// code evolving
for (int i = 0; i < 5; i++) {
Entry entry = getEntry(affinity);
- if (entry.retain()) {
+ if (entry.retain(isStreaming)) {
return entry;
}
}
@@ -507,8 +508,14 @@ private Entry getEntry(int affinity) {
return localEntries.get(index);
}
+ /** Gets the current list of BigtableChannelInsight objects. */
+ @Override
+ public List extends BigtableChannelObserver> getChannelInfos() {
+ return entries.get();
+ }
+
/** Bundles a gRPC {@link ManagedChannel} with some usage accounting. */
- static class Entry {
+ static class Entry implements BigtableChannelObserver {
private final ManagedChannel channel;
/**
@@ -525,9 +532,16 @@ static class Entry {
* outstanding RPCs has to happen when the ClientCall is closed or the ClientCall failed to
* start.
*/
- @VisibleForTesting final AtomicInteger outstandingRpcs = new AtomicInteger(0);
+ @VisibleForTesting final AtomicReference isAltsHolder = new AtomicReference<>(null);
+
+ @VisibleForTesting final AtomicInteger errorCount = new AtomicInteger(0);
+ @VisibleForTesting final AtomicInteger successCount = new AtomicInteger(0);
+ @VisibleForTesting final AtomicInteger outstandingUnaryRpcs = new AtomicInteger(0);
+
+ @VisibleForTesting final AtomicInteger outstandingStreamingRpcs = new AtomicInteger(0);
- private final AtomicInteger maxOutstanding = new AtomicInteger();
+ private final AtomicInteger maxOutstandingUnaryRpcs = new AtomicInteger();
+ private final AtomicInteger maxOutstandingStreamingRpcs = new AtomicInteger();
/** Queue storing the last 5 minutes of probe results */
@VisibleForTesting
@@ -551,12 +565,26 @@ static class Entry {
this.channel = channel;
}
+ void checkAndSetIsAlts(ClientCall, ?> call) {
+ boolean currentIsAlts = AltsContextUtil.check(call);
+ isAltsHolder.set(currentIsAlts);
+ }
+
ManagedChannel getManagedChannel() {
return this.channel;
}
+ @VisibleForTesting
+ int totalOutstandingRpcs() {
+ return outstandingUnaryRpcs.get() + outstandingStreamingRpcs.get();
+ }
+
int getAndResetMaxOutstanding() {
- return maxOutstanding.getAndSet(outstandingRpcs.get());
+ int currentUnary = outstandingUnaryRpcs.get();
+ int currentStreaming = outstandingStreamingRpcs.get();
+ int prevMaxUnary = maxOutstandingUnaryRpcs.getAndSet(currentUnary);
+ int prevMaxStreaming = maxOutstandingStreamingRpcs.getAndSet(currentStreaming);
+ return prevMaxStreaming + prevMaxUnary;
}
/**
@@ -565,19 +593,16 @@ int getAndResetMaxOutstanding() {
* channel has been successfully retained and it is the responsibility of the caller to release
* it.
*/
- private boolean retain() {
- // register desire to start RPC
- int currentOutstanding = outstandingRpcs.incrementAndGet();
-
- // Rough bookkeeping
- int prevMax = maxOutstanding.get();
- if (currentOutstanding > prevMax) {
- maxOutstanding.incrementAndGet();
- }
-
+ @VisibleForTesting
+ boolean retain(boolean isStreaming) {
+ AtomicInteger counter = isStreaming ? outstandingStreamingRpcs : outstandingUnaryRpcs;
+ AtomicInteger maxCounter =
+ isStreaming ? maxOutstandingStreamingRpcs : maxOutstandingUnaryRpcs;
+ int currentOutstanding = counter.incrementAndGet();
+ maxCounter.accumulateAndGet(currentOutstanding, Math::max);
// abort if the channel is closing
if (shutdownRequested.get()) {
- release();
+ release(isStreaming);
return false;
}
return true;
@@ -587,15 +612,19 @@ private boolean retain() {
* Notify the channel that the number of outstanding RPCs has decreased. If shutdown has been
* previously requested, this method will shutdown the channel if its the last outstanding RPC.
*/
- private void release() {
- int newCount = outstandingRpcs.decrementAndGet();
+ void release(boolean isStreaming) {
+ int newCount =
+ isStreaming
+ ? outstandingStreamingRpcs.decrementAndGet()
+ : outstandingUnaryRpcs.decrementAndGet();
if (newCount < 0) {
LOG.log(Level.WARNING, "Bug! Reference count is negative (" + newCount + ")!");
}
- // Must check outstandingRpcs after shutdownRequested (in reverse order of retain()) to ensure
+ // Must check toalOutstandingRpcs after shutdownRequested (in reverse order of retain()) to
+ // ensure
// mutual exclusion.
- if (shutdownRequested.get() && outstandingRpcs.get() == 0) {
+ if (shutdownRequested.get() && totalOutstandingRpcs() == 0) {
shutdown();
}
}
@@ -606,7 +635,7 @@ private void release() {
*/
private void requestShutdown() {
shutdownRequested.set(true);
- if (outstandingRpcs.get() == 0) {
+ if (totalOutstandingRpcs() == 0) {
shutdown();
}
}
@@ -617,6 +646,43 @@ private void shutdown() {
channel.shutdown();
}
}
+
+ /** Gets the current number of outstanding Unary RPCs on this channel. */
+ @Override
+ public int getOutstandingUnaryRpcs() {
+ return outstandingUnaryRpcs.get();
+ }
+
+ @Override
+ public int getOutstandingStreamingRpcs() {
+ return outstandingStreamingRpcs.get();
+ }
+
+ /** Get the current number of errors request count since the last observed period */
+ @Override
+ public long getAndResetErrorCount() {
+ return errorCount.getAndSet(0);
+ }
+
+ /** Get the current number of successful requests since the last observed period */
+ @Override
+ public long getAndResetSuccessCount() {
+ return successCount.getAndSet(0);
+ }
+
+ @Override
+ public boolean isAltsChannel() {
+ Boolean val = isAltsHolder.get();
+ return val != null && val;
+ }
+
+ void incrementErrorCount() {
+ errorCount.incrementAndGet();
+ }
+
+ void incrementSuccessCount() {
+ successCount.incrementAndGet();
+ }
}
/** Thin wrapper to ensure that new calls are properly reference counted. */
@@ -635,8 +701,11 @@ public String authority() {
@Override
public ClientCall newCall(
MethodDescriptor methodDescriptor, CallOptions callOptions) {
- Entry entry = getRetainedEntry(index);
- return new ReleasingClientCall<>(entry.channel.newCall(methodDescriptor, callOptions), entry);
+ boolean isStreaming =
+ methodDescriptor.getType() == MethodDescriptor.MethodType.SERVER_STREAMING;
+ Entry entry = getRetainedEntry(index, isStreaming);
+ return new ReleasingClientCall<>(
+ entry.channel.newCall(methodDescriptor, callOptions), entry, isStreaming);
}
}
@@ -644,12 +713,14 @@ public ClientCall newCall(
static class ReleasingClientCall extends SimpleForwardingClientCall {
@Nullable private CancellationException cancellationException;
final Entry entry;
+ private final boolean isStreaming;
private final AtomicBoolean wasClosed = new AtomicBoolean();
private final AtomicBoolean wasReleased = new AtomicBoolean();
- public ReleasingClientCall(ClientCall delegate, Entry entry) {
+ public ReleasingClientCall(ClientCall delegate, Entry entry, boolean isStreaming) {
super(delegate);
this.entry = entry;
+ this.isStreaming = isStreaming;
}
@Override
@@ -658,6 +729,8 @@ public void start(Listener responseListener, Metadata headers) {
throw new IllegalStateException("Call is already cancelled", cancellationException);
}
try {
+ entry.checkAndSetIsAlts(delegate());
+
super.start(
new SimpleForwardingClientCallListener(responseListener) {
@Override
@@ -670,10 +743,16 @@ public void onClose(Status status, Metadata trailers) {
return;
}
try {
+ // status for increment success and error count
+ if (status.isOk()) {
+ entry.incrementSuccessCount();
+ } else {
+ entry.incrementErrorCount();
+ }
super.onClose(status, trailers);
} finally {
if (wasReleased.compareAndSet(false, true)) {
- entry.release();
+ entry.release(isStreaming);
} else {
LOG.log(
Level.WARNING,
@@ -687,7 +766,7 @@ public void onClose(Status status, Metadata trailers) {
} catch (Exception e) {
// In case start failed, make sure to release
if (wasReleased.compareAndSet(false, true)) {
- entry.release();
+ entry.release(isStreaming);
} else {
LOG.log(
Level.WARNING,
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolObserver.java
new file mode 100644
index 0000000000..0b6d3c8664
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolObserver.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.gaxx.grpc;
+
+import com.google.api.core.InternalApi;
+import java.util.List;
+
+@InternalApi
+@FunctionalInterface
+public interface BigtableChannelPoolObserver {
+ /** Gets the current list of BigtableChannelInfo objects. */
+ List extends BigtableChannelObserver> getChannelInfos();
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java
index ba18994619..13340c4086 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java
@@ -23,12 +23,14 @@
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
+import com.google.cloud.bigtable.data.v2.stub.metrics.ChannelPoolMetricsTracer;
import com.google.common.base.Preconditions;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import javax.annotation.Nullable;
/**
* An instance of TransportChannelProvider that provides a TransportChannel through a supplied
@@ -39,12 +41,15 @@ public final class BigtableTransportChannelProvider implements TransportChannelP
private final InstantiatingGrpcChannelProvider delegate;
private final ChannelPrimer channelPrimer;
+ @Nullable private final ChannelPoolMetricsTracer channelPoolMetricsTracer;
private BigtableTransportChannelProvider(
InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider,
- ChannelPrimer channelPrimer) {
+ ChannelPrimer channelPrimer,
+ ChannelPoolMetricsTracer channelPoolMetricsTracer) {
delegate = Preconditions.checkNotNull(instantiatingGrpcChannelProvider);
this.channelPrimer = channelPrimer;
+ this.channelPoolMetricsTracer = channelPoolMetricsTracer;
}
@Override
@@ -66,7 +71,8 @@ public BigtableTransportChannelProvider withExecutor(ScheduledExecutorService ex
public BigtableTransportChannelProvider withExecutor(Executor executor) {
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withExecutor(executor);
- return new BigtableTransportChannelProvider(newChannelProvider, channelPrimer);
+ return new BigtableTransportChannelProvider(
+ newChannelProvider, channelPrimer, channelPoolMetricsTracer);
}
@Override
@@ -78,7 +84,8 @@ public boolean needsHeaders() {
public BigtableTransportChannelProvider withHeaders(Map headers) {
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withHeaders(headers);
- return new BigtableTransportChannelProvider(newChannelProvider, channelPrimer);
+ return new BigtableTransportChannelProvider(
+ newChannelProvider, channelPrimer, channelPoolMetricsTracer);
}
@Override
@@ -90,7 +97,8 @@ public boolean needsEndpoint() {
public TransportChannelProvider withEndpoint(String endpoint) {
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withEndpoint(endpoint);
- return new BigtableTransportChannelProvider(newChannelProvider, channelPrimer);
+ return new BigtableTransportChannelProvider(
+ newChannelProvider, channelPrimer, channelPoolMetricsTracer);
}
@Deprecated
@@ -104,7 +112,8 @@ public boolean acceptsPoolSize() {
public TransportChannelProvider withPoolSize(int size) {
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withPoolSize(size);
- return new BigtableTransportChannelProvider(newChannelProvider, channelPrimer);
+ return new BigtableTransportChannelProvider(
+ newChannelProvider, channelPrimer, channelPoolMetricsTracer);
}
/** Expected to only be called once when BigtableClientContext is created */
@@ -136,6 +145,12 @@ public TransportChannel getTransportChannel() throws IOException {
BigtableChannelPool btChannelPool =
BigtableChannelPool.create(btPoolSettings, channelFactory, channelPrimer);
+ if (channelPoolMetricsTracer != null) {
+ channelPoolMetricsTracer.registerChannelInsightsProvider(btChannelPool::getChannelInfos);
+ channelPoolMetricsTracer.registerLoadBalancingStrategy(
+ btPoolSettings.getLoadBalancingStrategy().name());
+ }
+
return GrpcTransportChannel.create(btChannelPool);
}
@@ -153,13 +168,16 @@ public boolean needsCredentials() {
public TransportChannelProvider withCredentials(Credentials credentials) {
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withCredentials(credentials);
- return new BigtableTransportChannelProvider(newChannelProvider, channelPrimer);
+ return new BigtableTransportChannelProvider(
+ newChannelProvider, channelPrimer, channelPoolMetricsTracer);
}
/** Creates a BigtableTransportChannelProvider. */
public static BigtableTransportChannelProvider create(
InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider,
- ChannelPrimer channelPrimer) {
- return new BigtableTransportChannelProvider(instantiatingGrpcChannelProvider, channelPrimer);
+ ChannelPrimer channelPrimer,
+ ChannelPoolMetricsTracer outstandingRpcsMetricTracke) {
+ return new BigtableTransportChannelProvider(
+ instantiatingGrpcChannelProvider, channelPrimer, outstandingRpcsMetricTracke);
}
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ExecuteQueryIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ExecuteQueryIT.java
index fc4aba5768..90bbd65fd5 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ExecuteQueryIT.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/ExecuteQueryIT.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.assertThrows;
import com.google.cloud.Date;
+import com.google.cloud.bigtable.admin.v2.models.CreateSchemaBundleRequest;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.TableId;
@@ -28,10 +29,14 @@
import com.google.cloud.bigtable.data.v2.models.sql.ResultSet;
import com.google.cloud.bigtable.data.v2.models.sql.SqlType;
import com.google.cloud.bigtable.data.v2.models.sql.Struct;
+import com.google.cloud.bigtable.data.v2.test.AlbumProto.Album;
+import com.google.cloud.bigtable.data.v2.test.SingerProto.Genre;
+import com.google.cloud.bigtable.data.v2.test.SingerProto.Singer;
import com.google.cloud.bigtable.test_helpers.env.AbstractTestEnv;
import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
@@ -51,6 +56,7 @@ public class ExecuteQueryIT {
@ClassRule public static TestEnvRule testEnvRule = new TestEnvRule();
private static BigtableDataClient dataClient;
private static String tableId;
+ private static String schemaBundleId;
private static String cf;
private static String uniquePrefix;
@@ -71,6 +77,7 @@ public static void setUpAll() throws IOException {
dataClient = testEnvRule.env().getDataClient();
cf = testEnvRule.env().getFamilyId();
uniquePrefix = UUID.randomUUID() + "-execute-query-it-";
+ schemaBundleId = UUID.randomUUID() + "-bundle";
dataClient.mutateRow(
RowMutation.create(TableId.of(tableId), uniquePrefix + "a")
@@ -154,7 +161,9 @@ public void withHistoryQuery() {
@SuppressWarnings("DoubleBraceInitialization")
@Test
- public void allTypes() {
+ public void allTypes() throws Exception {
+ createTestSchemaBundle();
+ Album album = Album.newBuilder().setTitle("Lover").build();
PreparedStatement preparedStatement =
dataClient.prepareStatement(
"SELECT 'stringVal' AS strCol, b'foo' as bytesCol, 1 AS intCol, CAST(1.2 AS FLOAT32) as"
@@ -162,7 +171,12 @@ public void allTypes() {
+ " TIMESTAMP_FROM_UNIX_MILLIS(1000) AS tsCol, DATE(2024, 06, 01) as dateCol,"
+ " STRUCT(1 as a, \"foo\" as b) AS structCol, [1,2,3] AS arrCol, "
+ cf
- + " as mapCol FROM `"
+ + " as mapCol, "
+ + " CAST(b'\022\005Lover' AS `"
+ + schemaBundleId
+ + ".com.google.cloud.bigtable.data.v2.test.Album`) as protoCol, CAST('JAZZ' AS `"
+ + schemaBundleId
+ + ".com.google.cloud.bigtable.data.v2.test.Genre`) as enumCol FROM `"
+ tableId
+ "` WHERE _key='"
+ uniquePrefix
@@ -213,9 +227,13 @@ public void allTypes() {
put(ByteString.copyFromUtf8("qual3"), ByteString.copyFromUtf8("val3"));
}
});
-
+ assertThat(rs.getProtoMessage("protoCol", Album.getDefaultInstance())).isEqualTo(album);
+ assertThat(rs.getProtoMessage(11, Album.getDefaultInstance())).isEqualTo(album);
+ assertThat(rs.getProtoEnum("enumCol", Genre::forNumber)).isEqualTo(Genre.JAZZ);
+ assertThat(rs.getProtoEnum(12, Genre::forNumber)).isEqualTo(Genre.JAZZ);
assertThat(rs.next()).isFalse();
}
+ deleteTestSchemaBundle();
}
@Test
@@ -380,4 +398,25 @@ public void testNullColumns() {
assertThat(rs.next()).isFalse();
}
}
+
+ private static void deleteTestSchemaBundle() {
+ testEnvRule.env().getTableAdminClient().deleteSchemaBundle(tableId, schemaBundleId);
+ }
+
+ private static void createTestSchemaBundle() throws Exception {
+ FileDescriptorSet fileDescriptorSet =
+ FileDescriptorSet.newBuilder()
+ .addFile(Singer.getDescriptor().getFile().toProto())
+ .addFile(Album.getDescriptor().getFile().toProto())
+ .build();
+ CreateSchemaBundleRequest request =
+ CreateSchemaBundleRequest.of(tableId, schemaBundleId)
+ .setProtoSchema(fileDescriptorSet.toByteString());
+ testEnvRule.env().getTableAdminClient().createSchemaBundle(request);
+
+ // For some reason the ExecuteQuery data path sometimes cannot resolve a newly-created schema
+ // bundle immediately after its creation. Adding a manual sleep to avoid test flakiness until
+ // the underlying issue is resolved.
+ Thread.sleep(5000);
+ }
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/ChannelPoolMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/ChannelPoolMetricsTracerTest.java
new file mode 100644
index 0000000000..bbe26b2030
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/ChannelPoolMetricsTracerTest.java
@@ -0,0 +1,342 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.OUTSTANDING_RPCS_PER_CHANNEL_NAME;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME;
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelObserver;
+import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPoolObserver;
+import com.google.common.collect.ImmutableList;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.stubbing.Answer;
+
+@RunWith(JUnit4.class)
+public class ChannelPoolMetricsTracerTest {
+
+ @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+
+ private InMemoryMetricReader metricReader;
+ @Mock private ScheduledExecutorService mockScheduler;
+ private ArgumentCaptor runnableCaptor;
+
+ private ChannelPoolMetricsTracer tracker;
+ private Attributes baseAttributes;
+
+ @Mock private BigtableChannelPoolObserver mockInsightsProvider;
+ @Mock private BigtableChannelObserver mockInsight1;
+ @Mock private BigtableChannelObserver mockInsight2;
+
+ @Before
+ public void setUp() {
+ metricReader = InMemoryMetricReader.create();
+ SdkMeterProvider meterProvider =
+ SdkMeterProvider.builder().registerMetricReader(metricReader).build();
+ OpenTelemetry openTelemetry =
+ OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
+
+ baseAttributes = Attributes.builder().build();
+
+ tracker = new ChannelPoolMetricsTracer(openTelemetry, baseAttributes);
+
+ runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ // Configure mockScheduler to capture the runnable when tracker.start() is called
+ when(mockScheduler.scheduleAtFixedRate(runnableCaptor.capture(), anyLong(), anyLong(), any()))
+ .then((Answer>) invocation -> Mockito.mock(ScheduledFuture.class));
+
+ // Default stubbing for insights provider
+ List defaultInsights = ImmutableList.of(mockInsight1, mockInsight2);
+ when(mockInsightsProvider.getChannelInfos()).thenAnswer(invocation -> defaultInsights);
+ // Default stubbing for observer methods
+ when(mockInsight1.getOutstandingUnaryRpcs()).thenReturn(0);
+ when(mockInsight1.getOutstandingStreamingRpcs()).thenReturn(0);
+ when(mockInsight1.getAndResetErrorCount()).thenReturn(0L);
+ when(mockInsight1.getAndResetSuccessCount()).thenReturn(0L);
+ when(mockInsight2.getOutstandingUnaryRpcs()).thenReturn(0);
+ when(mockInsight2.getOutstandingStreamingRpcs()).thenReturn(0);
+ when(mockInsight2.getAndResetErrorCount()).thenReturn(0L);
+ when(mockInsight2.getAndResetSuccessCount()).thenReturn(0L);
+ when(mockInsight1.isAltsChannel()).thenReturn(false);
+ when(mockInsight2.isAltsChannel()).thenReturn(false);
+ }
+
+ /** Helper to run the captured ChannelPoolMetricsTracer task. */
+ void runTrackerTask() {
+ List capturedRunnables = runnableCaptor.getAllValues();
+ assertThat(capturedRunnables).hasSize(1); // Expect only one task scheduled
+ Runnable trackerRunnable = capturedRunnables.get(0);
+ assertThat(trackerRunnable).isInstanceOf(ChannelPoolMetricsTracer.class);
+ trackerRunnable.run();
+ }
+
+ private Attributes getExpectedErrorAttributes() {
+ return Attributes.builder().build();
+ }
+
+ private static Attributes getExpectedRpcAttributes(String lbPolicy, boolean streaming) {
+ return Attributes.builder()
+ .put(AttributeKey.stringKey("transport_type"), "CLOUDPATH")
+ .put(AttributeKey.stringKey("lb_policy"), lbPolicy)
+ .put(AttributeKey.booleanKey("streaming"), streaming)
+ .build();
+ }
+
+ private static Optional getMetricData(
+ Collection metrics, String metricName) {
+ return metrics.stream().filter(m -> m.getName().equals(metricName)).findFirst();
+ }
+
+ private static HistogramPointData getPointForStreaming(
+ Collection points, boolean streaming) {
+ return points.stream()
+ .filter(
+ p ->
+ Boolean.TRUE.equals(p.getAttributes().get(AttributeKey.booleanKey("streaming")))
+ == streaming)
+ .findFirst()
+ .orElseThrow(
+ () -> new AssertionError("Missing HistogramPointData for streaming=" + streaming));
+ }
+
+ /** Helper to create expected Attributes for assertions. */
+ private static Attributes getExpectedAttributes(String lbPolicy, boolean streaming) {
+ return Attributes.builder()
+ .put(AttributeKey.stringKey("transport_type"), "grpc")
+ .put(AttributeKey.stringKey("lb_policy"), lbPolicy)
+ .put(AttributeKey.booleanKey("streaming"), streaming)
+ .build();
+ }
+
+ @Test
+ public void testSingleRun() {
+ // Arrange
+ tracker.registerChannelInsightsProvider(mockInsightsProvider);
+ tracker.registerLoadBalancingStrategy("LEAST_IN_FLIGHT");
+ tracker.start(mockScheduler);
+
+ // Outstanding RPCs
+ when(mockInsight1.getOutstandingUnaryRpcs()).thenReturn(5);
+ when(mockInsight1.getOutstandingStreamingRpcs()).thenReturn(2);
+ when(mockInsight2.getOutstandingUnaryRpcs()).thenReturn(10);
+ when(mockInsight2.getOutstandingStreamingRpcs()).thenReturn(8);
+ // Error Counts
+ when(mockInsight1.getAndResetErrorCount()).thenReturn(1L);
+ when(mockInsight1.getAndResetSuccessCount()).thenReturn(9L);
+ when(mockInsight2.getAndResetErrorCount()).thenReturn(3L);
+ when(mockInsight2.getAndResetSuccessCount()).thenReturn(7L);
+
+ runTrackerTask();
+
+ // Assert
+ Collection metrics = metricReader.collectAllMetrics();
+ assertThat(metrics).hasSize(2);
+
+ // Assert Outstanding RPCs metric
+ Optional rpcMetricDataOpt =
+ getMetricData(metrics, OUTSTANDING_RPCS_PER_CHANNEL_NAME);
+ assertThat(rpcMetricDataOpt.isPresent()).isTrue();
+ MetricData rpcMetricData = rpcMetricDataOpt.get();
+ Collection rpcPoints = rpcMetricData.getHistogramData().getPoints();
+ assertThat(rpcPoints).hasSize(2); // One for streaming=false, one for streaming=true
+
+ HistogramPointData unaryPoint = getPointForStreaming(rpcPoints, false);
+ assertThat(unaryPoint.getAttributes())
+ .isEqualTo(getExpectedRpcAttributes("LEAST_IN_FLIGHT", false));
+ assertThat(unaryPoint.getCount()).isEqualTo(2);
+ assertThat(unaryPoint.getSum()).isWithin(1e-9).of(15.0); // 5 + 10
+
+ HistogramPointData streamingPoint = getPointForStreaming(rpcPoints, true);
+ assertThat(streamingPoint.getAttributes())
+ .isEqualTo(getExpectedRpcAttributes("LEAST_IN_FLIGHT", true));
+ assertThat(streamingPoint.getCount()).isEqualTo(2);
+ assertThat(streamingPoint.getSum()).isWithin(1e-9).of(10.0); // 2 + 8
+
+ // Assert Error Count metric
+ Optional errorMetricDataOpt =
+ getMetricData(metrics, PER_CONNECTION_ERROR_COUNT_NAME);
+ assertThat(errorMetricDataOpt.isPresent()).isTrue();
+ MetricData errorMetricData = errorMetricDataOpt.get();
+ Collection errorPoints = errorMetricData.getHistogramData().getPoints();
+ assertThat(errorPoints).hasSize(1);
+
+ HistogramPointData errorPoint = errorPoints.iterator().next();
+ assertThat(errorPoint.getAttributes()).isEqualTo(getExpectedErrorAttributes());
+ assertThat(errorPoint.getCount()).isEqualTo(2); // Two insights
+ assertThat(errorPoint.getSum()).isWithin(1e-9).of(4.0); // 1 + 3
+ }
+
+ @Test
+ public void testMultipleRuns() {
+ // Arrange
+ tracker.registerChannelInsightsProvider(mockInsightsProvider);
+ tracker.registerLoadBalancingStrategy("ROUND_ROBIN");
+ tracker.start(mockScheduler);
+
+ // First run
+ when(mockInsight1.getOutstandingUnaryRpcs()).thenReturn(1);
+ when(mockInsight1.getOutstandingStreamingRpcs()).thenReturn(2);
+ when(mockInsight2.getOutstandingUnaryRpcs()).thenReturn(3);
+ when(mockInsight2.getOutstandingStreamingRpcs()).thenReturn(4);
+ when(mockInsight1.getAndResetErrorCount()).thenReturn(1L);
+ when(mockInsight1.getAndResetSuccessCount()).thenReturn(1L);
+ when(mockInsight2.getAndResetErrorCount()).thenReturn(0L);
+ when(mockInsight2.getAndResetSuccessCount()).thenReturn(2L);
+ runTrackerTask();
+
+ // Second run - values change
+ when(mockInsight1.getOutstandingUnaryRpcs()).thenReturn(10);
+ when(mockInsight1.getOutstandingStreamingRpcs()).thenReturn(20);
+ when(mockInsight2.getOutstandingUnaryRpcs()).thenReturn(30);
+ when(mockInsight2.getOutstandingStreamingRpcs()).thenReturn(40);
+ when(mockInsight1.getAndResetErrorCount()).thenReturn(5L);
+ when(mockInsight1.getAndResetSuccessCount()).thenReturn(5L);
+ when(mockInsight2.getAndResetErrorCount()).thenReturn(2L);
+ when(mockInsight2.getAndResetSuccessCount()).thenReturn(8L);
+ runTrackerTask();
+
+ // Assert cumulative metrics
+ Collection metrics = metricReader.collectAllMetrics();
+ assertThat(metrics).hasSize(2);
+
+ // Assert Outstanding RPCs
+ Optional rpcMetricDataOpt =
+ getMetricData(metrics, OUTSTANDING_RPCS_PER_CHANNEL_NAME);
+ assertThat(rpcMetricDataOpt.isPresent()).isTrue();
+ Collection rpcPoints =
+ rpcMetricDataOpt.get().getHistogramData().getPoints();
+ assertThat(rpcPoints).hasSize(2);
+
+ HistogramPointData unaryPoint = getPointForStreaming(rpcPoints, false);
+ assertThat(unaryPoint.getCount()).isEqualTo(4); // 2 insights * 2 runs
+ assertThat(unaryPoint.getSum()).isWithin(1e-9).of(44.0); // (1 + 3) + (10 + 30)
+
+ HistogramPointData streamingPoint = getPointForStreaming(rpcPoints, true);
+ assertThat(streamingPoint.getCount()).isEqualTo(4); // 2 insights * 2 runs
+ assertThat(streamingPoint.getSum()).isWithin(1e-9).of(66.0); // (2 + 4) + (20 + 40)
+
+ // Assert Error Counts
+ Optional errorMetricDataOpt =
+ getMetricData(metrics, PER_CONNECTION_ERROR_COUNT_NAME);
+ assertThat(errorMetricDataOpt.isPresent()).isTrue();
+ Collection errorPoints =
+ errorMetricDataOpt.get().getHistogramData().getPoints();
+ assertThat(errorPoints).hasSize(1);
+
+ HistogramPointData errorPoint = errorPoints.iterator().next();
+ assertThat(errorPoint.getAttributes()).isEqualTo(getExpectedErrorAttributes());
+ assertThat(errorPoint.getCount()).isEqualTo(4); // 2 insights * 2 runs
+ assertThat(errorPoint.getSum()).isWithin(1e-9).of(8.0); // (1 + 0) + (5 + 2)
+ }
+
+ @Test
+ public void testErrorMetricsOnlyRecordedForAllChannels() {
+ // Arrange
+ tracker.registerChannelInsightsProvider(mockInsightsProvider);
+ tracker.start(mockScheduler);
+
+ // Insight 1: Active (has successes)
+ when(mockInsight1.getAndResetErrorCount()).thenReturn(0L);
+ when(mockInsight1.getAndResetSuccessCount()).thenReturn(5L);
+ // Insight 2: Inactive
+ when(mockInsight2.getAndResetErrorCount()).thenReturn(0L);
+ when(mockInsight2.getAndResetSuccessCount()).thenReturn(0L);
+
+ runTrackerTask();
+
+ Collection metrics = metricReader.collectAllMetrics();
+ Optional errorMetricDataOpt =
+ getMetricData(metrics, PER_CONNECTION_ERROR_COUNT_NAME);
+ assertThat(errorMetricDataOpt.isPresent()).isTrue();
+ Collection errorPoints =
+ errorMetricDataOpt.get().getHistogramData().getPoints();
+ assertThat(errorPoints).hasSize(1);
+
+ HistogramPointData errorPoint = errorPoints.iterator().next();
+ assertThat(errorPoint.getAttributes()).isEqualTo(getExpectedErrorAttributes());
+ assertThat(errorPoint.getCount()).isEqualTo(2); // both channel recorded
+ assertThat(errorPoint.getSum()).isWithin(1e-9).of(0.0); // Recorded 0 errors
+ }
+
+ @Test
+ public void testDefaultLbPolicy() {
+ // Arrange: Only register insights provider, not LB strategy
+ tracker.registerChannelInsightsProvider(mockInsightsProvider);
+ tracker.start(mockScheduler);
+ runTrackerTask();
+
+ Collection metrics = metricReader.collectAllMetrics();
+ Optional rpcMetricDataOpt =
+ getMetricData(metrics, OUTSTANDING_RPCS_PER_CHANNEL_NAME);
+ assertThat(rpcMetricDataOpt.isPresent()).isTrue();
+ Collection points = rpcMetricDataOpt.get().getHistogramData().getPoints();
+
+ points.forEach(
+ point ->
+ assertThat(point.getAttributes().asMap())
+ .containsEntry(AttributeKey.stringKey("lb_policy"), "ROUND_ROBIN"));
+ }
+
+ @Test
+ public void testNoMetricsIfChannelInsightsProviderInactive() {
+ tracker.start(mockScheduler);
+ runTrackerTask();
+ assertThat(metricReader.collectAllMetrics()).isEmpty();
+ }
+
+ @Test
+ public void testNoMetricsIfChannelInsightsEmpty() {
+ tracker.registerChannelInsightsProvider(mockInsightsProvider);
+ when(mockInsightsProvider.getChannelInfos()).thenReturn(ImmutableList.of());
+ tracker.start(mockScheduler);
+ runTrackerTask();
+ assertThat(metricReader.collectAllMetrics()).isEmpty();
+ }
+
+ @Test
+ public void testNoMetricsIfChannelInsightsNull() {
+ tracker.registerChannelInsightsProvider(mockInsightsProvider);
+ when(mockInsightsProvider.getChannelInfos()).thenReturn(null);
+ tracker.start(mockScheduler);
+ runTrackerTask();
+ assertThat(metricReader.collectAllMetrics()).isEmpty();
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionTest.java
deleted file mode 100644
index 94beeff6f7..0000000000
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionTest.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Copyright 2024 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub.metrics;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.when;
-
-import com.google.api.gax.core.FixedExecutorProvider;
-import com.google.api.gax.grpc.ChannelPoolSettings;
-import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
-import com.google.bigtable.v2.*;
-import com.google.cloud.bigtable.Version;
-import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
-import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
-import com.google.cloud.bigtable.data.v2.models.*;
-import com.google.cloud.bigtable.data.v2.models.Row;
-import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
-import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
-import com.google.common.collect.Lists;
-import io.grpc.Server;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
-import io.grpc.stub.StreamObserver;
-import io.opentelemetry.api.common.Attributes;
-import io.opentelemetry.sdk.OpenTelemetrySdk;
-import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.SdkMeterProvider;
-import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
-import io.opentelemetry.sdk.metrics.View;
-import io.opentelemetry.sdk.metrics.data.HistogramPointData;
-import io.opentelemetry.sdk.metrics.data.MetricData;
-import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-
-@RunWith(JUnit4.class)
-public class ErrorCountPerConnectionTest {
- private static final String SUCCESS_TABLE_NAME = "some-table";
- private static final String ERROR_TABLE_NAME = "nonexistent-table";
- private Server server;
- private final FakeService fakeService = new FakeService();
- private EnhancedBigtableStubSettings.Builder builder;
- private ArgumentCaptor runnableCaptor;
-
- private InMemoryMetricReader metricReader;
-
- private Attributes attributes;
-
- @Before
- public void setup() throws Exception {
- server = FakeServiceBuilder.create(fakeService).start();
-
- ScheduledExecutorService executors = Mockito.mock(ScheduledExecutorService.class);
-
- attributes =
- Attributes.builder()
- .put(BuiltinMetricsConstants.BIGTABLE_PROJECT_ID_KEY, "fake-project")
- .put(BuiltinMetricsConstants.INSTANCE_ID_KEY, "fake-instance")
- .put(BuiltinMetricsConstants.APP_PROFILE_KEY, "")
- .put(BuiltinMetricsConstants.CLIENT_NAME_KEY, "bigtable-java/" + Version.VERSION)
- .build();
-
- metricReader = InMemoryMetricReader.create();
-
- SdkMeterProviderBuilder meterProvider =
- SdkMeterProvider.builder().registerMetricReader(metricReader);
-
- for (Map.Entry e :
- BuiltinMetricsConstants.getInternalViews().entrySet()) {
- meterProvider.registerView(e.getKey(), e.getValue());
- }
-
- OpenTelemetrySdk otel =
- OpenTelemetrySdk.builder().setMeterProvider(meterProvider.build()).build();
-
- builder =
- BigtableDataSettings.newBuilderForEmulator(server.getPort())
- .stubSettings()
- .setBackgroundExecutorProvider(FixedExecutorProvider.create(executors))
- .setProjectId("fake-project")
- .setInstanceId("fake-instance")
- .setMetricsProvider(NoopMetricsProvider.INSTANCE)
- .setInternalMetricsProvider((ignored1, ignored2) -> otel);
-
- runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
- when(executors.scheduleAtFixedRate(runnableCaptor.capture(), anyLong(), anyLong(), any()))
- .then((Answer>) invocation -> Mockito.mock(ScheduledFuture.class));
- }
-
- @After
- public void tearDown() throws Exception {
- if (server != null) {
- server.shutdown();
- }
- }
-
- @Test
- public void readWithOneChannel() throws Exception {
- long errorCount = 0;
-
- try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(builder.build())) {
- for (int i = 0; i < 20; i++) {
- Query query;
- if (i % 3 == 0) {
- query = Query.create(ERROR_TABLE_NAME);
- errorCount += 1;
- } else {
- query = Query.create(SUCCESS_TABLE_NAME);
- }
- try {
- @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
- ArrayList ignored = Lists.newArrayList(stub.readRowsCallable().call(query));
- } catch (Exception e) {
- // noop
- }
- }
- }
-
- runInterceptorTasksAndAssertCount();
-
- MetricData metricData =
- BuiltinMetricsTestUtils.getMetricData(
- metricReader, BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME);
-
- // Make sure the correct bucket is updated with the correct number of data points
- ArrayList histogramPointData =
- new ArrayList<>(metricData.getHistogramData().getPoints());
- assertThat(histogramPointData.size()).isEqualTo(1);
- HistogramPointData point = histogramPointData.get(0);
- int index = findDataPointIndex(point.getBoundaries(), errorCount);
- assertThat(point.getCounts().get(index)).isEqualTo(1);
- }
-
- @Test
- public void readWithTwoChannels() throws Exception {
- EnhancedBigtableStubSettings.Builder builderWithTwoChannels =
- builder.setTransportChannelProvider(
- ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider())
- .toBuilder()
- .setChannelPoolSettings(ChannelPoolSettings.staticallySized(2))
- .build());
- long totalErrorCount = 0;
- try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(builderWithTwoChannels.build())) {
- for (int i = 0; i < 20; i++) {
- try {
- if (i < 10) {
- totalErrorCount += 1;
- @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
- ArrayList ignored =
- Lists.newArrayList(stub.readRowsCallable().call(Query.create(ERROR_TABLE_NAME)));
- } else {
- ArrayList ignored =
- Lists.newArrayList(stub.readRowsCallable().call(Query.create(SUCCESS_TABLE_NAME)));
- }
- } catch (Exception e) {
- // noop
- }
- }
- }
- runInterceptorTasksAndAssertCount();
-
- long errorCountPerChannel = totalErrorCount / 2;
-
- MetricData metricData =
- BuiltinMetricsTestUtils.getMetricData(
- metricReader, BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME);
-
- // The 2 channels should get equal amount of errors, so the totalErrorCount / 2 bucket is
- // updated twice.
- ArrayList histogramPointData =
- new ArrayList<>(metricData.getHistogramData().getPoints());
- assertThat(histogramPointData.size()).isEqualTo(1);
- HistogramPointData point = histogramPointData.get(0);
- int index = findDataPointIndex(point.getBoundaries(), errorCountPerChannel);
- assertThat(point.getCounts().get(index)).isEqualTo(2);
- }
-
- @Test
- public void readOverTwoPeriods() throws Exception {
- long errorCount1 = 0;
- long errorCount2 = 0;
- try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(builder.build())) {
-
- for (int i = 0; i < 20; i++) {
- Query query;
- if (i % 3 == 0) {
- query = Query.create(ERROR_TABLE_NAME);
- errorCount1 += 1;
- } else {
- query = Query.create(SUCCESS_TABLE_NAME);
- }
- try {
- @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
- ArrayList ignored = Lists.newArrayList(stub.readRowsCallable().call(query));
- } catch (Exception e) {
- // noop
- }
- }
-
- runInterceptorTasksAndAssertCount();
-
- for (int i = 0; i < 20; i++) {
- Query query;
- if (i % 3 == 0) {
- query = Query.create(SUCCESS_TABLE_NAME);
- } else {
- query = Query.create(ERROR_TABLE_NAME);
- errorCount2 += 1;
- }
- try {
- @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
- ArrayList ignored = Lists.newArrayList(stub.readRowsCallable().call(query));
- } catch (Exception e) {
- // noop
- }
- }
- }
-
- runInterceptorTasksAndAssertCount();
-
- MetricData metricData =
- BuiltinMetricsTestUtils.getMetricData(
- metricReader, BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME);
-
- ArrayList histogramPointData =
- new ArrayList<>(metricData.getHistogramData().getPoints());
- assertThat(histogramPointData.size()).isEqualTo(1);
- HistogramPointData point = histogramPointData.get(0);
- int index1 = findDataPointIndex(point.getBoundaries(), errorCount1);
- int index2 = findDataPointIndex(point.getBoundaries(), errorCount2);
- assertThat(point.getCounts().get(index1)).isEqualTo(1);
- assertThat(point.getCounts().get(index2)).isEqualTo(1);
- }
-
- @Test
- public void noFailedRequests() throws Exception {
- try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(builder.build())) {
- for (int i = 0; i < 20; i++) {
- try {
- @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
- ArrayList ignored =
- Lists.newArrayList(stub.readRowsCallable().call(Query.create(SUCCESS_TABLE_NAME)));
- } catch (Exception e) {
- // noop
- }
- }
- }
-
- runInterceptorTasksAndAssertCount();
- MetricData metricData =
- BuiltinMetricsTestUtils.getMetricData(
- metricReader, BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME);
- long value = BuiltinMetricsTestUtils.getAggregatedValue(metricData, attributes);
- assertThat(value).isEqualTo(0);
- }
-
- private void runInterceptorTasksAndAssertCount() {
- int actualNumOfTasks = 0;
- for (Runnable runnable : runnableCaptor.getAllValues()) {
- if (runnable instanceof ErrorCountPerConnectionMetricTracker) {
- runnable.run();
- actualNumOfTasks++;
- }
- }
- assertThat(actualNumOfTasks).isEqualTo(1);
- }
-
- private int findDataPointIndex(List boundaries, long dataPoint) {
- int index = 0;
- for (; index < boundaries.size(); index++) {
- if (boundaries.get(index) >= dataPoint) {
- break;
- }
- }
- return index;
- }
-
- static class FakeService extends BigtableGrpc.BigtableImplBase {
- @Override
- public void readRows(
- ReadRowsRequest request, StreamObserver responseObserver) {
- if (request.getTableName().contains(SUCCESS_TABLE_NAME)) {
- responseObserver.onNext(ReadRowsResponse.getDefaultInstance());
- responseObserver.onCompleted();
- } else {
- // Send a non-retriable error, since otherwise the client tries to use the mocked
- // ScheduledExecutorService object for retyring, resulting in a hang.
- StatusRuntimeException exception = new StatusRuntimeException(Status.INTERNAL);
- responseObserver.onError(exception);
- }
- }
- }
-}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolSettingsTest.java
index 28d5a43738..7fb35308ea 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolSettingsTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolSettingsTest.java
@@ -19,6 +19,7 @@
import com.google.api.gax.grpc.ChannelPoolSettings;
import com.google.common.collect.ImmutableSet;
+import io.grpc.ManagedChannel;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
@@ -27,6 +28,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
@RunWith(JUnit4.class)
public class BigtableChannelPoolSettingsTest {
@@ -48,6 +50,86 @@ public void testToBigtableChannelPoolSettingsAllFieldsSetCopiesCorrectly() throw
assertSettingsCopiedCorrectly(originalSettings, copiedSettings);
}
+ @Test
+ public void testEntryRetainReleaseByType() {
+ ManagedChannel mockChannel = Mockito.mock(ManagedChannel.class);
+ BigtableChannelPool.Entry entry = new BigtableChannelPool.Entry(mockChannel);
+
+ // Test Unary
+ assertThat(entry.retain(false)).isTrue(); // Unary
+ assertThat(entry.outstandingUnaryRpcs.get()).isEqualTo(1);
+ assertThat(entry.outstandingStreamingRpcs.get()).isEqualTo(0);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(1);
+ // Test Unary release
+ entry.release(false);
+ assertThat(entry.outstandingUnaryRpcs.get()).isEqualTo(0);
+ assertThat(entry.outstandingStreamingRpcs.get()).isEqualTo(0);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(0);
+
+ // Test Streaming
+ assertThat(entry.retain(true)).isTrue(); // Streaming
+ assertThat(entry.outstandingUnaryRpcs.get()).isEqualTo(0);
+ assertThat(entry.outstandingStreamingRpcs.get()).isEqualTo(1);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(1);
+ // Test Streaming again
+ assertThat(entry.retain(true)).isTrue(); // Streaming again
+ assertThat(entry.outstandingStreamingRpcs.get()).isEqualTo(2);
+ assertThat(entry.outstandingUnaryRpcs.get()).isEqualTo(0);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(2);
+
+ entry.release(true);
+ assertThat(entry.outstandingStreamingRpcs.get()).isEqualTo(1);
+ assertThat(entry.outstandingUnaryRpcs.get()).isEqualTo(0);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(1);
+
+ entry.release(true);
+ assertThat(entry.outstandingStreamingRpcs.get()).isEqualTo(0);
+ assertThat(entry.outstandingUnaryRpcs.get()).isEqualTo(0);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(0);
+
+ // Test Error Counting
+ entry.incrementErrorCount();
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(1);
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(0); // Should be reset
+
+ entry.incrementErrorCount();
+ entry.incrementErrorCount();
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(2);
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(0);
+
+ // Test Success Counting
+ entry.incrementSuccessCount();
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(1);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(0); // Should be reset
+
+ entry.incrementSuccessCount();
+ entry.incrementSuccessCount();
+ entry.incrementSuccessCount();
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(3);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(0);
+
+ // Test Mixed Error and Success Counting
+ entry.incrementErrorCount();
+ entry.incrementSuccessCount();
+ entry.incrementSuccessCount();
+ entry.incrementErrorCount();
+ entry.incrementSuccessCount();
+
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(2);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(3);
+
+ // Verify reset after mixed
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(0);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(0);
+
+ // Ensure retain/release doesn't affect error/success counts
+ entry.incrementErrorCount();
+ entry.retain(false);
+ entry.release(false);
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(1);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(0);
+ }
+
@Test
public void testToBigtableChannelPoolSettingsDefaultValuesCopiesCorrectly() throws Exception {
ChannelPoolSettings originalSettings = ChannelPoolSettings.builder().build();
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolTest.java
new file mode 100644
index 0000000000..caed478e0a
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolTest.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.gaxx.grpc;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import com.google.api.gax.grpc.ChannelFactory;
+import com.google.common.collect.Iterables;
+import io.grpc.Attributes;
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(JUnit4.class)
+public class BigtableChannelPoolTest {
+ @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+
+ @Mock private ChannelFactory mockChannelFactory;
+ @Mock private ChannelPrimer mockChannelPrimer;
+ @Mock private ManagedChannel mockChannel;
+ @Mock private ClientCall mockClientCall;
+
+ private MethodDescriptor unaryMethodDescriptor;
+ private MethodDescriptor streamingMethodDescriptor;
+
+ @Captor private ArgumentCaptor> listenerCaptor;
+
+ private BigtableChannelPool channelPool;
+ private ScheduledExecutorService executorService;
+
+ private static class StringMarshaller implements MethodDescriptor.Marshaller {
+ @Override
+ public InputStream stream(String value) {
+ return null; // Not used in this test
+ }
+
+ @Override
+ public String parse(InputStream stream) {
+ return null; // Not used in this test
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ when(mockChannelFactory.createSingleChannel()).thenReturn(mockChannel);
+ when(mockChannel.newCall(
+ ArgumentMatchers.>any(), any(CallOptions.class)))
+ .thenReturn(mockClientCall);
+ // Setup MethodDescriptors
+ // Initialize real MethodDescriptor instances
+ MethodDescriptor.Marshaller marshaller = new StringMarshaller();
+ unaryMethodDescriptor =
+ MethodDescriptor.newBuilder()
+ .setType(MethodDescriptor.MethodType.UNARY)
+ .setFullMethodName(MethodDescriptor.generateFullMethodName("bigtable", "MutateRow"))
+ .setRequestMarshaller(marshaller)
+ .setResponseMarshaller(marshaller)
+ .build();
+
+ streamingMethodDescriptor =
+ MethodDescriptor.newBuilder()
+ .setType(MethodDescriptor.MethodType.SERVER_STREAMING)
+ .setFullMethodName(MethodDescriptor.generateFullMethodName("bigtable", "ReadRows"))
+ .setRequestMarshaller(marshaller)
+ .setResponseMarshaller(marshaller)
+ .build();
+
+ executorService = Executors.newSingleThreadScheduledExecutor();
+
+ BigtableChannelPoolSettings settings =
+ BigtableChannelPoolSettings.builder()
+ .setInitialChannelCount(1)
+ .setMinChannelCount(1)
+ .setMaxChannelCount(1)
+ .build();
+ channelPool =
+ new BigtableChannelPool(settings, mockChannelFactory, mockChannelPrimer, executorService);
+
+ // Capture the listener when start is called
+ // Configure mockClientCall.start to capture the listener
+ doNothing().when(mockClientCall).start(listenerCaptor.capture(), any(Metadata.class));
+ // Default to no ALTS context
+ when(mockClientCall.getAttributes()).thenReturn(Attributes.EMPTY);
+ }
+
+ private BigtableChannelPool.Entry getSingleEntry() {
+ List extends BigtableChannelObserver> infos = channelPool.getChannelInfos();
+ return (BigtableChannelPool.Entry) Iterables.getOnlyElement(infos);
+ }
+
+ private ClientCall.Listener startCall(MethodDescriptor method) {
+ ClientCall call = channelPool.newCall(method, CallOptions.DEFAULT);
+ call.start(mock(ClientCall.Listener.class), new Metadata());
+ return listenerCaptor.getValue();
+ }
+
+ @Test
+ public void testUnaryRpcSuccess() {
+ BigtableChannelPool.Entry entry = getSingleEntry();
+
+ // Before call
+ assertThat(entry.getOutstandingUnaryRpcs()).isEqualTo(0);
+ assertThat(entry.getOutstandingStreamingRpcs()).isEqualTo(0);
+
+ ClientCall.Listener listener = startCall(unaryMethodDescriptor);
+
+ // After start
+ assertThat(entry.getOutstandingUnaryRpcs()).isEqualTo(1);
+ assertThat(entry.getOutstandingStreamingRpcs()).isEqualTo(0);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(1);
+
+ // Simulate call success
+ listener.onClose(Status.OK, new Metadata());
+
+ // After close
+ assertThat(entry.getOutstandingUnaryRpcs()).isEqualTo(0);
+ assertThat(entry.getOutstandingStreamingRpcs()).isEqualTo(0);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(0);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(1);
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testUnaryRpcFailure() {
+ BigtableChannelPool.Entry entry = getSingleEntry();
+ ClientCall.Listener listener = startCall(unaryMethodDescriptor);
+
+ assertThat(entry.getOutstandingUnaryRpcs()).isEqualTo(1);
+
+ // Simulate call failure
+ listener.onClose(Status.UNAVAILABLE, new Metadata());
+
+ // After close
+ assertThat(entry.getOutstandingUnaryRpcs()).isEqualTo(0);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(0);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(0);
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void testStreamingRpcSuccess() {
+ BigtableChannelPool.Entry entry = getSingleEntry();
+
+ ClientCall.Listener listener = startCall(streamingMethodDescriptor);
+
+ // After start
+ assertThat(entry.getOutstandingUnaryRpcs()).isEqualTo(0);
+ assertThat(entry.getOutstandingStreamingRpcs()).isEqualTo(1);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(1);
+
+ // Simulate call success
+ listener.onClose(Status.OK, new Metadata());
+
+ // After close
+ assertThat(entry.getOutstandingStreamingRpcs()).isEqualTo(0);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(0);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(1);
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testStreamingRpcFailure() {
+ BigtableChannelPool.Entry entry = getSingleEntry();
+ ClientCall.Listener listener = startCall(streamingMethodDescriptor);
+
+ assertThat(entry.getOutstandingStreamingRpcs()).isEqualTo(1);
+
+ // Simulate call failure
+ listener.onClose(Status.DEADLINE_EXCEEDED, new Metadata());
+
+ // After close
+ assertThat(entry.getOutstandingStreamingRpcs()).isEqualTo(0);
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(0);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(0);
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void testMixedRpcs() {
+ BigtableChannelPool.Entry entry = getSingleEntry();
+
+ // 1 Unary OK
+ startCall(unaryMethodDescriptor).onClose(Status.OK, new Metadata());
+ // 1 Unary Fail
+ startCall(unaryMethodDescriptor).onClose(Status.INTERNAL, new Metadata());
+ // 1 Streaming OK
+ startCall(streamingMethodDescriptor).onClose(Status.OK, new Metadata());
+ // 2 Streaming Fail
+ startCall(streamingMethodDescriptor).onClose(Status.CANCELLED, new Metadata());
+ ClientCall.Listener streamingListener = startCall(streamingMethodDescriptor);
+
+ // Before the last one closes
+ assertThat(entry.getOutstandingUnaryRpcs()).isEqualTo(0);
+ assertThat(entry.getOutstandingStreamingRpcs()).isEqualTo(1);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(2); // 1 Unary + 1 Streaming
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(2); // 1 Unary + 1 Streaming
+
+ // Close the last one
+ streamingListener.onClose(Status.UNKNOWN, new Metadata());
+ assertThat(entry.getOutstandingStreamingRpcs()).isEqualTo(0);
+ assertThat(entry.getAndResetSuccessCount()).isEqualTo(0);
+ assertThat(entry.getAndResetErrorCount()).isEqualTo(1); // The last failure
+ assertThat(entry.totalOutstandingRpcs()).isEqualTo(0);
+ }
+
+ @Test
+ public void testNonAltsChannelReturnsFalse() {
+ // empty attributes
+ // cannot test true value as logic is complicated.
+ // alts check looks at attributes.get(AltsProtocolNegotiator.AUTH_CONTEXT_KEY);
+ when(mockClientCall.getAttributes()).thenReturn(Attributes.EMPTY);
+ BigtableChannelPool.Entry entry = getSingleEntry();
+ assertThat(entry.isAltsHolder.get()).isNull();
+ startCall(unaryMethodDescriptor);
+ assertThat(entry.isAltsChannel()).isFalse();
+ }
+}
diff --git a/grpc-google-cloud-bigtable-admin-v2/pom.xml b/grpc-google-cloud-bigtable-admin-v2/pom.xml
index 50ebe04a7b..6ae06f900e 100644
--- a/grpc-google-cloud-bigtable-admin-v2/pom.xml
+++ b/grpc-google-cloud-bigtable-admin-v2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigtable-admin-v2
- 2.68.0
+ 2.69.0
grpc-google-cloud-bigtable-admin-v2
GRPC library for grpc-google-cloud-bigtable-admin-v2
com.google.cloud
google-cloud-bigtable-parent
- 2.68.0
+ 2.69.0
@@ -18,14 +18,14 @@
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.68.0
+ 2.69.0
pom
import
com.google.cloud
google-cloud-bigtable-bom
- 2.68.0
+ 2.69.0
pom
import
diff --git a/grpc-google-cloud-bigtable-v2/pom.xml b/grpc-google-cloud-bigtable-v2/pom.xml
index a37447310c..79f0e79334 100644
--- a/grpc-google-cloud-bigtable-v2/pom.xml
+++ b/grpc-google-cloud-bigtable-v2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigtable-v2
- 2.68.0
+ 2.69.0
grpc-google-cloud-bigtable-v2
GRPC library for grpc-google-cloud-bigtable-v2
com.google.cloud
google-cloud-bigtable-parent
- 2.68.0
+ 2.69.0
@@ -18,14 +18,14 @@
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.68.0
+ 2.69.0
pom
import
com.google.cloud
google-cloud-bigtable-bom
- 2.68.0
+ 2.69.0
pom
import
diff --git a/pom.xml b/pom.xml
index 166009825a..b91f4c1d2f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
google-cloud-bigtable-parent
pom
- 2.68.0
+ 2.69.0
Google Cloud Bigtable Parent
https://github.com/googleapis/java-bigtable
@@ -156,27 +156,27 @@
com.google.api.grpc
proto-google-cloud-bigtable-v2
- 2.68.0
+ 2.69.0
com.google.api.grpc
proto-google-cloud-bigtable-admin-v2
- 2.68.0
+ 2.69.0
com.google.api.grpc
grpc-google-cloud-bigtable-v2
- 2.68.0
+ 2.69.0
com.google.api.grpc
grpc-google-cloud-bigtable-admin-v2
- 2.68.0
+ 2.69.0
com.google.cloud
google-cloud-bigtable
- 2.68.0
+ 2.69.0
diff --git a/proto-google-cloud-bigtable-admin-v2/pom.xml b/proto-google-cloud-bigtable-admin-v2/pom.xml
index 4a193f76fd..524cb4ecfd 100644
--- a/proto-google-cloud-bigtable-admin-v2/pom.xml
+++ b/proto-google-cloud-bigtable-admin-v2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-bigtable-admin-v2
- 2.68.0
+ 2.69.0
proto-google-cloud-bigtable-admin-v2
PROTO library for proto-google-cloud-bigtable-admin-v2
com.google.cloud
google-cloud-bigtable-parent
- 2.68.0
+ 2.69.0
@@ -18,14 +18,14 @@
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.68.0
+ 2.69.0
pom
import
com.google.cloud
google-cloud-bigtable-bom
- 2.68.0
+ 2.69.0
pom
import
diff --git a/proto-google-cloud-bigtable-v2/pom.xml b/proto-google-cloud-bigtable-v2/pom.xml
index 804f6628d6..080f08d68c 100644
--- a/proto-google-cloud-bigtable-v2/pom.xml
+++ b/proto-google-cloud-bigtable-v2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-bigtable-v2
- 2.68.0
+ 2.69.0
proto-google-cloud-bigtable-v2
PROTO library for proto-google-cloud-bigtable-v2
com.google.cloud
google-cloud-bigtable-parent
- 2.68.0
+ 2.69.0
@@ -18,14 +18,14 @@
com.google.cloud
google-cloud-bigtable-deps-bom
- 2.68.0
+ 2.69.0
pom
import
com.google.cloud
google-cloud-bigtable-bom
- 2.68.0
+ 2.69.0
pom
import
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 5811f8136f..f700183283 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
google-cloud-bigtable
- 2.68.0
+ 2.69.0
diff --git a/test-proxy/pom.xml b/test-proxy/pom.xml
index db757703ab..28dbae19a8 100644
--- a/test-proxy/pom.xml
+++ b/test-proxy/pom.xml
@@ -12,11 +12,11 @@
google-cloud-bigtable-parent
com.google.cloud
- 2.68.0
+ 2.69.0
- 2.68.0
+ 2.69.0
diff --git a/versions.txt b/versions.txt
index 5df65f73f4..cf92827dfc 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,10 +1,10 @@
# Format:
# module:released-version:current-version
-google-cloud-bigtable:2.68.0:2.68.0
-grpc-google-cloud-bigtable-admin-v2:2.68.0:2.68.0
-grpc-google-cloud-bigtable-v2:2.68.0:2.68.0
-proto-google-cloud-bigtable-admin-v2:2.68.0:2.68.0
-proto-google-cloud-bigtable-v2:2.68.0:2.68.0
-google-cloud-bigtable-emulator:0.205.0:0.205.0
-google-cloud-bigtable-emulator-core:0.205.0:0.205.0
+google-cloud-bigtable:2.69.0:2.69.0
+grpc-google-cloud-bigtable-admin-v2:2.69.0:2.69.0
+grpc-google-cloud-bigtable-v2:2.69.0:2.69.0
+proto-google-cloud-bigtable-admin-v2:2.69.0:2.69.0
+proto-google-cloud-bigtable-v2:2.69.0:2.69.0
+google-cloud-bigtable-emulator:0.206.0:0.206.0
+google-cloud-bigtable-emulator-core:0.206.0:0.206.0