diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java
index 0b6d948fe5..b8a514433f 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java
@@ -145,6 +145,62 @@ public static Builder newBuilderForEmulator(String hostname, int port) {
return builder;
}
+ /**
+ * @deprecated OpenCensus support is deprecated and will be removed in a future version
+ * Enables OpenCensus metric aggregations.
+ *
+ * This will register Bigtable client relevant {@link io.opencensus.stats.View}s. When coupled
+ * with an exporter, it allows users to monitor client behavior.
+ *
+ *
Please note that in addition to calling this method, the application must:
+ *
+ * - Include openensus-impl dependency on the classpath
+ *
- Configure an exporter like opencensus-exporter-stats-stackdriver
+ *
+ *
+ * Example usage for maven:
+ *
{@code
+ *
+ * io.opencensus
+ * opencensus-impl
+ * ${opencensus.version}
+ * runtime
+ *
+ *
+ *
+ * io.opencensus
+ * opencensus-exporter-stats-stackdriver
+ * ${opencensus.version}
+ *
+ *
+ *
+ * Java:
+ * {@code
+ * StackdriverStatsExporter.createAndRegister();
+ * BigtableDataSettings.enableOpenCensusStats();
+ * }
+ */
+ @Deprecated
+ public static void enableOpenCensusStats() {
+ com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientViews();
+ // TODO(igorbernstein): Enable grpc views once we upgrade to grpc-java 1.24.0
+ // Required change: https://github.com/grpc/grpc-java/pull/5996
+ // io.opencensus.contrib.grpc.metrics.RpcViews.registerClientGrpcBasicViews();
+ }
+
+ /**
+ * @deprecated OpenCensus support is deprecated and will be removed in a future version Enables
+ * OpenCensus GFE metric aggregations.
+ * This will register views for gfe_latency and gfe_header_missing_count metrics.
+ *
gfe_latency measures the latency between Google's network receives an RPC and reads back
+ * the first byte of the response. gfe_header_missing_count is a counter of the number of RPC
+ * responses received without the server-timing header.
+ */
+ @Deprecated
+ public static void enableGfeOpenCensusStats() {
+ com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews();
+ }
+
/**
* Register built in metrics.
*
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
index 42b46ab3b5..5f6b69dea8 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
@@ -28,6 +28,7 @@
import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.BackgroundResource;
+import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
@@ -47,6 +48,7 @@
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracerFactory;
+import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.api.gax.tracing.TracedUnaryCallable;
@@ -98,6 +100,8 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
+import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
+import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
@@ -122,6 +126,7 @@
import com.google.cloud.bigtable.data.v2.stub.sql.SqlRowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
@@ -130,6 +135,12 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import io.grpc.MethodDescriptor;
+import io.opencensus.stats.Stats;
+import io.opencensus.stats.StatsRecorder;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tagger;
+import io.opencensus.tags.Tags;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
@@ -215,11 +226,47 @@ public static BigtableClientContext createBigtableClientContext(
public static ApiTracerFactory createBigtableTracerFactory(
EnhancedBigtableStubSettings settings, @Nullable OpenTelemetry openTelemetry)
throws IOException {
+ return createBigtableTracerFactory(
+ settings, Tags.getTagger(), Stats.getStatsRecorder(), openTelemetry);
+ }
- ImmutableList.Builder tracerFactories = ImmutableList.builder();
-
- tracerFactories.add(settings.getTracerFactory());
+ @VisibleForTesting
+ public static ApiTracerFactory createBigtableTracerFactory(
+ EnhancedBigtableStubSettings settings,
+ Tagger tagger,
+ StatsRecorder stats,
+ @Nullable OpenTelemetry openTelemetry)
+ throws IOException {
+ String projectId = settings.getProjectId();
+ String instanceId = settings.getInstanceId();
+ String appProfileId = settings.getAppProfileId();
+
+ ImmutableMap attributes =
+ ImmutableMap.builder()
+ .put(RpcMeasureConstants.BIGTABLE_PROJECT_ID, TagValue.create(projectId))
+ .put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(instanceId))
+ .put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(appProfileId))
+ .build();
+ ImmutableList.Builder tracerFactories = ImmutableList.builder();
+ tracerFactories
+ .add(
+ // Add OpenCensus Tracing
+ new OpencensusTracerFactory(
+ ImmutableMap.builder()
+ // Annotate traces with the same tags as metrics
+ .put(RpcMeasureConstants.BIGTABLE_PROJECT_ID.getName(), projectId)
+ .put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID.getName(), instanceId)
+ .put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID.getName(), appProfileId)
+ // Also annotate traces with library versions
+ .put("gax", GaxGrpcProperties.getGaxGrpcVersion())
+ .put("grpc", GaxGrpcProperties.getGrpcVersion())
+ .put("gapic", Version.VERSION)
+ .build()))
+ // Add OpenCensus Metrics
+ .add(MetricsTracerFactory.create(tagger, stats, attributes))
+ // Add user configured tracer
+ .add(settings.getTracerFactory());
BuiltinMetricsTracerFactory builtinMetricsTracerFactory =
openTelemetry != null
? BuiltinMetricsTracerFactory.create(openTelemetry, createBuiltinAttributes(settings))
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
index f8f84e651a..13b832b8b1 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
@@ -31,9 +31,9 @@
* This callable will
* -Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC methods upon
* completion. The {@link BigtableTracer} will process metrics that were injected in the
- * header/trailer and publish them. If {@link GrpcResponseMetadata#getMetadata()} returned null,
- * it probably means that the request has never reached GFE, and it'll increment the
- * gfe_header_missing_counter in this case.
+ * header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
+ * returned null, it probably means that the request has never reached GFE, and it'll increment
+ * the gfe_header_missing_counter in this case.
* -This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* -Call {@link BigtableTracer#onRequest(int)} to record the request events in a stream.
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
index 8bf31dfc9d..37ba74bfdb 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
@@ -30,9 +30,9 @@
* This callable will:
* - Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC methods upon
* completion. The {@link BigtableTracer} will process metrics that were injected in the
- * header/trailer and publish them. If {@link GrpcResponseMetadata#getMetadata()} returned null,
- * it probably means that the request has never reached GFE, and it'll increment the
- * gfe_header_missing_counter in this case.
+ * header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
+ * returned null, it probably means that the request has never reached GFE, and it'll increment
+ * the gfe_header_missing_counter in this case.
* -This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* -This class will also inject a {@link BigtableGrpcStreamTracer} that'll record the time an
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
index c9f33c950b..1f95224185 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
@@ -122,8 +122,8 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend
private TransportAttrs transportAttrs = null;
- // Server histogram buckets use [start, end), however OpenTelemetry uses (start, end]. To work
- // around this, we measure all the latencies in nanoseconds and convert them
+ // OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start,
+ // end]. To work around this, we measure all the latencies in nanoseconds and convert them
// to milliseconds and use DoubleHistogram. This should minimize the chance of a data
// point fall on the bucket boundary that causes off by one errors.
private final DoubleHistogram operationLatenciesHistogram;
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
new file mode 100644
index 0000000000..c322b75df8
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeDuration;
+
+import com.google.api.core.ObsoleteApi;
+import com.google.api.gax.retrying.ServerStreamingAttemptException;
+import com.google.api.gax.tracing.ApiTracerFactory.OperationType;
+import com.google.api.gax.tracing.SpanName;
+import com.google.common.base.Stopwatch;
+import io.opencensus.stats.MeasureMap;
+import io.opencensus.stats.StatsRecorder;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagContextBuilder;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tagger;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+
+class MetricsTracer extends BigtableTracer {
+
+ private final OperationType operationType;
+
+ private final Tagger tagger;
+ private final StatsRecorder stats;
+
+ // Tags
+ private final TagContext parentContext;
+ private final SpanName spanName;
+ private final Map statsAttributes;
+
+ // Operation level metrics
+ private final AtomicBoolean opFinished = new AtomicBoolean();
+ private final Stopwatch operationTimer = Stopwatch.createStarted();
+ private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();
+ private long operationResponseCount = 0;
+
+ // Attempt level metrics
+ private int attemptCount = 0;
+ private Stopwatch attemptTimer;
+ private long attemptResponseCount = 0;
+
+ private volatile int attempt = 0;
+
+ private volatile boolean reportBatchingLatency = false;
+ private volatile long batchThrottledLatency = 0;
+
+ MetricsTracer(
+ OperationType operationType,
+ Tagger tagger,
+ StatsRecorder stats,
+ SpanName spanName,
+ Map statsAttributes) {
+ this.operationType = operationType;
+ this.tagger = tagger;
+ this.stats = stats;
+ this.parentContext = tagger.getCurrentTagContext();
+ this.spanName = spanName;
+ this.statsAttributes = statsAttributes;
+ }
+
+ @Override
+ public Scope inScope() {
+ return new Scope() {
+ @Override
+ public void close() {}
+ };
+ }
+
+ @Override
+ public void operationFinishEarly() {
+ attemptTimer.stop();
+ operationTimer.stop();
+ }
+
+ @Override
+ public void operationSucceeded() {
+ recordOperationCompletion(null);
+ }
+
+ @Override
+ public void operationCancelled() {
+ recordOperationCompletion(new CancellationException());
+ }
+
+ @Override
+ public void operationFailed(Throwable throwable) {
+ recordOperationCompletion(throwable);
+ }
+
+ private void recordOperationCompletion(@Nullable Throwable throwable) {
+ if (!opFinished.compareAndSet(false, true)) {
+ return;
+ }
+
+ long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS);
+
+ MeasureMap measures =
+ stats
+ .newMeasureMap()
+ .put(RpcMeasureConstants.BIGTABLE_OP_LATENCY, elapsed)
+ .put(RpcMeasureConstants.BIGTABLE_OP_ATTEMPT_COUNT, attemptCount);
+
+ if (operationType == OperationType.ServerStreaming
+ && spanName.getMethodName().equals("ReadRows")) {
+ measures.put(
+ RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY,
+ firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ TagContextBuilder tagCtx =
+ newTagCtxBuilder()
+ .putLocal(
+ RpcMeasureConstants.BIGTABLE_STATUS,
+ TagValue.create(Util.extractStatus(throwable)));
+
+ measures.record(tagCtx.build());
+ }
+
+ @Override
+ public void attemptStarted(int attemptNumber) {
+ attempt = attemptNumber;
+ attemptCount++;
+ attemptTimer = Stopwatch.createStarted();
+ attemptResponseCount = 0;
+ }
+
+ @Override
+ public void attemptSucceeded() {
+ recordAttemptCompletion(null);
+ }
+
+ @Override
+ public void attemptCancelled() {
+ recordAttemptCompletion(new CancellationException());
+ }
+
+ /**
+ * This method is obsolete. Use {@link #attemptFailedDuration(Throwable, java.time.Duration)}
+ * instead.
+ */
+ @ObsoleteApi("Use attemptFailedDuration(Throwable, java.time.Duration) instead")
+ @Override
+ public void attemptFailed(Throwable error, org.threeten.bp.Duration delay) {
+ attemptFailedDuration(error, toJavaTimeDuration(delay));
+ }
+
+ @Override
+ public void attemptFailedDuration(Throwable throwable, java.time.Duration duration) {
+ recordAttemptCompletion(throwable);
+ }
+
+ @Override
+ public void attemptFailedRetriesExhausted(Throwable throwable) {
+ recordAttemptCompletion(throwable);
+ }
+
+ @Override
+ public void attemptPermanentFailure(Throwable throwable) {
+ recordAttemptCompletion(throwable);
+ }
+
+ private void recordAttemptCompletion(@Nullable Throwable throwable) {
+ MeasureMap measures =
+ stats
+ .newMeasureMap()
+ .put(
+ RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY,
+ attemptTimer.elapsed(TimeUnit.MILLISECONDS));
+
+ if (reportBatchingLatency) {
+ measures.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, batchThrottledLatency);
+
+ // Reset batch throttling latency for next attempt. This can't be done in attemptStarted
+ // because batching flow control will add batching latency before the attempt has started.
+ batchThrottledLatency = 0;
+ }
+
+ // Patch the throwable until it's fixed in gax. When an attempt failed,
+ // it'll throw a ServerStreamingAttemptException. Unwrap the exception
+ // so it could get processed by extractStatus
+ if (throwable instanceof ServerStreamingAttemptException) {
+ throwable = throwable.getCause();
+ }
+
+ TagContextBuilder tagCtx =
+ newTagCtxBuilder()
+ .putLocal(
+ RpcMeasureConstants.BIGTABLE_STATUS,
+ TagValue.create(Util.extractStatus(throwable)));
+
+ measures.record(tagCtx.build());
+ }
+
+ @Override
+ public void responseReceived() {
+ if (firstResponsePerOpTimer.isRunning()) {
+ firstResponsePerOpTimer.stop();
+ }
+ attemptResponseCount++;
+ operationResponseCount++;
+ }
+
+ @Override
+ public int getAttempt() {
+ return attempt;
+ }
+
+ @Override
+ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
+ MeasureMap measures = stats.newMeasureMap();
+ if (latency != null) {
+ measures
+ .put(RpcMeasureConstants.BIGTABLE_GFE_LATENCY, latency)
+ .put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 0L);
+ } else {
+ measures.put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 1L);
+ }
+ measures.record(
+ newTagCtxBuilder()
+ .putLocal(
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create(Util.extractStatus(throwable)))
+ .build());
+ }
+
+ @Override
+ public void batchRequestThrottled(long totalThrottledMs) {
+ reportBatchingLatency = true;
+ batchThrottledLatency += totalThrottledMs;
+ }
+
+ private TagContextBuilder newTagCtxBuilder() {
+ TagContextBuilder tagCtx =
+ tagger.toBuilder(parentContext)
+ .putLocal(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(spanName.toString()));
+
+ // Copy client level tags in
+ for (Entry entry : statsAttributes.entrySet()) {
+ tagCtx.putLocal(entry.getKey(), entry.getValue());
+ }
+
+ return tagCtx;
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java
new file mode 100644
index 0000000000..e0c173a2be
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.tracing.ApiTracer;
+import com.google.api.gax.tracing.ApiTracerFactory;
+import com.google.api.gax.tracing.BaseApiTracerFactory;
+import com.google.api.gax.tracing.SpanName;
+import com.google.common.collect.ImmutableMap;
+import io.opencensus.stats.StatsRecorder;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tagger;
+
+/**
+ * {@link ApiTracerFactory} that will generate OpenCensus metrics by using the {@link ApiTracer}
+ * api.
+ */
+@InternalApi("For internal use only")
+public class MetricsTracerFactory extends BaseApiTracerFactory {
+ private final Tagger tagger;
+ private final StatsRecorder stats;
+ private final ImmutableMap statsAttributes;
+
+ public static MetricsTracerFactory create(
+ Tagger tagger, StatsRecorder stats, ImmutableMap statsAttributes) {
+ return new MetricsTracerFactory(tagger, stats, statsAttributes);
+ }
+
+ private MetricsTracerFactory(
+ Tagger tagger, StatsRecorder stats, ImmutableMap statsAttributes) {
+ this.tagger = tagger;
+ this.stats = stats;
+ this.statsAttributes = statsAttributes;
+ }
+
+ @Override
+ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) {
+ return new MetricsTracer(operationType, tagger, stats, spanName, statsAttributes);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java
new file mode 100644
index 0000000000..560bb084bf
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2019 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.InternalApi;
+import io.opencensus.stats.Measure.MeasureLong;
+import io.opencensus.tags.TagKey;
+
+@InternalApi("For internal use only")
+public class RpcMeasureConstants {
+ // TagKeys
+ public static final TagKey BIGTABLE_PROJECT_ID = TagKey.create("bigtable_project_id");
+ public static final TagKey BIGTABLE_INSTANCE_ID = TagKey.create("bigtable_instance_id");
+ public static final TagKey BIGTABLE_APP_PROFILE_ID = TagKey.create("bigtable_app_profile_id");
+
+ /** Tag key that represents a Bigtable operation name. */
+ static final TagKey BIGTABLE_OP = TagKey.create("bigtable_op");
+
+ /** Tag key that represents the final status of the Bigtable operation. */
+ static final TagKey BIGTABLE_STATUS = TagKey.create("bigtable_status");
+
+ // Units
+ /** Unit to represent counts. */
+ private static final String COUNT = "1";
+
+ /** Unit to represent milliseconds. */
+ private static final String MILLISECOND = "ms";
+
+ // Measurements
+ /**
+ * Latency for a logic operation, which will include latencies for each attempt and exponential
+ * backoff delays.
+ */
+ static final MeasureLong BIGTABLE_OP_LATENCY =
+ MeasureLong.create(
+ "cloud.google.com/java/bigtable/op_latency",
+ "Time between request being sent to last row received, "
+ + "or terminal error of the last retry attempt.",
+ MILLISECOND);
+
+ /**
+ * Number of attempts a logical operation took to complete. Under normal circumstances should be
+ * 1.
+ */
+ static final MeasureLong BIGTABLE_OP_ATTEMPT_COUNT =
+ MeasureLong.MeasureLong.create(
+ "cloud.google.com/java/bigtable/op_attempt_count",
+ "Number of attempts per operation",
+ COUNT);
+
+ /** Latency that a single attempt (RPC) took to complete. */
+ static final MeasureLong BIGTABLE_ATTEMPT_LATENCY =
+ MeasureLong.create(
+ "cloud.google.com/java/bigtable/attempt_latency",
+ "Duration of an individual operation attempt",
+ MILLISECOND);
+
+ /** Latency for the caller to see the first row in a ReadRows stream. */
+ static final MeasureLong BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY =
+ MeasureLong.create(
+ "cloud.google.com/java/bigtable/read_rows_first_row_latency",
+ "Time between request being sent to the first row received",
+ MILLISECOND);
+
+ /** GFE t4t7 latency extracted from server-timing header. */
+ public static final MeasureLong BIGTABLE_GFE_LATENCY =
+ MeasureLong.create(
+ "cloud.google.com/java/bigtable/gfe_latency",
+ "Latency between Google's network receives an RPC and reads back the first byte of the"
+ + " response",
+ MILLISECOND);
+
+ /** Number of responses without the server-timing header. */
+ public static final MeasureLong BIGTABLE_GFE_HEADER_MISSING_COUNT =
+ MeasureLong.create(
+ "cloud.google.com/java/bigtable/gfe_header_missing_count",
+ "Number of RPC responses received without the server-timing header, most likely means"
+ + " that the RPC never reached Google's network",
+ COUNT);
+
+ /** Total throttled time of a batch in msecs. */
+ public static final MeasureLong BIGTABLE_BATCH_THROTTLED_TIME =
+ MeasureLong.create(
+ "cloud.google.com/java/bigtable/batch_throttled_time",
+ "Total throttled time of a batch in msecs",
+ MILLISECOND);
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java
new file mode 100644
index 0000000000..4e21eaf785
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2019 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP_ATTEMPT_COUNT;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP_LATENCY;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_PROJECT_ID;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_STATUS;
+
+import com.google.common.collect.ImmutableList;
+import io.opencensus.stats.Aggregation;
+import io.opencensus.stats.Aggregation.Count;
+import io.opencensus.stats.Aggregation.Distribution;
+import io.opencensus.stats.Aggregation.Sum;
+import io.opencensus.stats.BucketBoundaries;
+import io.opencensus.stats.View;
+import java.util.Arrays;
+
+class RpcViewConstants {
+ // Aggregations
+ private static final Aggregation COUNT = Count.create();
+ private static final Aggregation SUM = Sum.create();
+
+ private static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM =
+ Distribution.create(
+ BucketBoundaries.create(
+ ImmutableList.of(
+ 0.0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0,
+ 13.0, 16.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0,
+ 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0,
+ 20000.0, 50000.0, 100000.0)));
+
+ private static final Aggregation AGGREGATION_ATTEMPT_COUNT =
+ Distribution.create(
+ BucketBoundaries.create(
+ ImmutableList.of(
+ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 15.0, 20.0, 30.0, 40.0, 50.0,
+ 100.0)));
+
+ private static final Aggregation AGGREGATION_WITH_POWERS_OF_2 =
+ Distribution.create(
+ BucketBoundaries.create(
+ ImmutableList.of(
+ 0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0, 2048.0,
+ 4096.0, 8192.0, 16384.0, 32768.0, 65536.0, 131072.0, 262144.0, 524288.0,
+ 1048576.0, 2097152.0)));
+
+ /**
+ * {@link View} for Bigtable client roundtrip latency in milliseconds including all retry
+ * attempts.
+ */
+ static final View BIGTABLE_OP_LATENCY_VIEW =
+ View.create(
+ View.Name.create("cloud.google.com/java/bigtable/op_latency"),
+ "Operation latency in msecs",
+ BIGTABLE_OP_LATENCY,
+ AGGREGATION_WITH_MILLIS_HISTOGRAM,
+ ImmutableList.of(
+ BIGTABLE_PROJECT_ID,
+ BIGTABLE_INSTANCE_ID,
+ BIGTABLE_APP_PROFILE_ID,
+ BIGTABLE_OP,
+ BIGTABLE_STATUS));
+
+ static final View BIGTABLE_COMPLETED_OP_VIEW =
+ View.create(
+ View.Name.create("cloud.google.com/java/bigtable/completed_ops"),
+ "Number of completed Bigtable client operations",
+ BIGTABLE_OP_LATENCY,
+ COUNT,
+ Arrays.asList(
+ BIGTABLE_PROJECT_ID,
+ BIGTABLE_INSTANCE_ID,
+ BIGTABLE_APP_PROFILE_ID,
+ BIGTABLE_OP,
+ BIGTABLE_STATUS));
+
+ static final View BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW =
+ View.create(
+ View.Name.create("cloud.google.com/java/bigtable/read_rows_first_row_latency"),
+ "Latency to receive the first row in a ReadRows stream",
+ BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY,
+ AGGREGATION_WITH_MILLIS_HISTOGRAM,
+ ImmutableList.of(BIGTABLE_PROJECT_ID, BIGTABLE_INSTANCE_ID, BIGTABLE_APP_PROFILE_ID));
+
+ static final View BIGTABLE_ATTEMPT_LATENCY_VIEW =
+ View.create(
+ View.Name.create("cloud.google.com/java/bigtable/attempt_latency"),
+ "Attempt latency in msecs",
+ BIGTABLE_ATTEMPT_LATENCY,
+ AGGREGATION_WITH_MILLIS_HISTOGRAM,
+ ImmutableList.of(
+ BIGTABLE_PROJECT_ID,
+ BIGTABLE_INSTANCE_ID,
+ BIGTABLE_APP_PROFILE_ID,
+ BIGTABLE_OP,
+ BIGTABLE_STATUS));
+
+ static final View BIGTABLE_ATTEMPTS_PER_OP_VIEW =
+ View.create(
+ View.Name.create("cloud.google.com/java/bigtable/attempts_per_op"),
+ "Distribution of attempts per logical operation",
+ BIGTABLE_OP_ATTEMPT_COUNT,
+ AGGREGATION_ATTEMPT_COUNT,
+ ImmutableList.of(
+ BIGTABLE_PROJECT_ID,
+ BIGTABLE_INSTANCE_ID,
+ BIGTABLE_APP_PROFILE_ID,
+ BIGTABLE_OP,
+ BIGTABLE_STATUS));
+
+ static final View BIGTABLE_GFE_LATENCY_VIEW =
+ View.create(
+ View.Name.create("cloud.google.com/java/bigtable/gfe_latency"),
+ "Latency between Google's network receives an RPC and reads back the first byte of the"
+ + " response",
+ BIGTABLE_GFE_LATENCY,
+ AGGREGATION_WITH_MILLIS_HISTOGRAM,
+ ImmutableList.of(
+ BIGTABLE_INSTANCE_ID,
+ BIGTABLE_PROJECT_ID,
+ BIGTABLE_APP_PROFILE_ID,
+ BIGTABLE_OP,
+ BIGTABLE_STATUS));
+
+ static final View BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW =
+ View.create(
+ View.Name.create("cloud.google.com/java/bigtable/gfe_header_missing_count"),
+ "Number of RPC responses received without the server-timing header, most likely means"
+ + " that the RPC never reached Google's network",
+ BIGTABLE_GFE_HEADER_MISSING_COUNT,
+ SUM,
+ ImmutableList.of(
+ BIGTABLE_INSTANCE_ID,
+ BIGTABLE_PROJECT_ID,
+ BIGTABLE_APP_PROFILE_ID,
+ BIGTABLE_OP,
+ BIGTABLE_STATUS));
+
+ // use distribution so we can correlate batch throttled time with op_latency
+ static final View BIGTABLE_BATCH_THROTTLED_TIME_VIEW =
+ View.create(
+ View.Name.create("cloud.google.com/java/bigtable/batch_throttled_time"),
+ "Total throttled time of a batch in msecs",
+ BIGTABLE_BATCH_THROTTLED_TIME,
+ AGGREGATION_WITH_MILLIS_HISTOGRAM,
+ ImmutableList.of(
+ BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java
new file mode 100644
index 0000000000..e8902108aa
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2019 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import io.opencensus.stats.Stats;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewManager;
+
+@Deprecated
+public class RpcViews {
+ @VisibleForTesting
+ private static final ImmutableSet BIGTABLE_CLIENT_VIEWS_SET =
+ ImmutableSet.of(
+ RpcViewConstants.BIGTABLE_OP_LATENCY_VIEW,
+ RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW,
+ RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW,
+ RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
+ RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW,
+ RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW);
+
+ private static final ImmutableSet GFE_VIEW_SET =
+ ImmutableSet.of(
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW);
+
+ private static boolean gfeMetricsRegistered = false;
+
+ /** Registers all Bigtable specific views. */
+ public static void registerBigtableClientViews() {
+ registerBigtableClientViews(Stats.getViewManager());
+ }
+
+ /**
+ * Register views for GFE metrics, including gfe_latency and gfe_header_missing_count. gfe_latency
+ * measures the latency between Google's network receives an RPC and reads back the first byte of
+ * the response. gfe_header_missing_count is a counter of the number of RPC responses without a
+ * server-timing header.
+ */
+ public static void registerBigtableClientGfeViews() {
+ registerBigtableClientGfeViews(Stats.getViewManager());
+ }
+
+ @VisibleForTesting
+ static void registerBigtableClientViews(ViewManager viewManager) {
+ for (View view : BIGTABLE_CLIENT_VIEWS_SET) {
+ viewManager.registerView(view);
+ }
+ }
+
+ @VisibleForTesting
+ static void registerBigtableClientGfeViews(ViewManager viewManager) {
+ for (View view : GFE_VIEW_SET) {
+ viewManager.registerView(view);
+ }
+ gfeMetricsRegistered = true;
+ }
+
+ static boolean isGfeMetricsRegistered() {
+ return gfeMetricsRegistered;
+ }
+
+ @VisibleForTesting
+ static void setGfeMetricsRegistered(boolean gfeMetricsRegistered) {
+ RpcViews.gfeMetricsRegistered = gfeMetricsRegistered;
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
index b0b40054d5..093d7800d3 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java
@@ -43,6 +43,7 @@
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
+import io.opencensus.tags.TagValue;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
@@ -56,11 +57,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
-/** Utilities to help with metrics. */
+/** Utilities to help integrating with OpenCensus. */
@InternalApi("For internal use only")
public class Util {
static final Metadata.Key ATTEMPT_HEADER_KEY =
@@ -74,7 +77,7 @@ public class Util {
static final Metadata.Key LOCATION_METADATA_KEY =
Metadata.Key.of("x-goog-ext-425905942-bin", Metadata.BINARY_BYTE_MARSHALLER);
- /** Convert an exception into a string. */
+ /** Convert an exception into a value that can be used to create an OpenCensus tag value. */
static String extractStatus(@Nullable Throwable error) {
final String statusString;
@@ -95,6 +98,26 @@ static String extractStatus(@Nullable Throwable error) {
return statusString;
}
+ /**
+ * Await the result of the future and convert it into a value that can be used as an OpenCensus
+ * tag value.
+ */
+ static TagValue extractStatusFromFuture(Future> future) {
+ Throwable error = null;
+
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ error = e;
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ error = e.getCause();
+ } catch (RuntimeException e) {
+ error = e;
+ }
+ return TagValue.create(extractStatus(error));
+ }
+
static String extractTableId(Object request) {
String tableName = null;
String authorizedViewName = null;
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
index c617274cae..e2bbc1bedb 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
@@ -36,6 +36,7 @@
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FailedPreconditionException;
@@ -66,6 +67,7 @@
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.RowSet;
+import com.google.cloud.bigtable.Version;
import com.google.cloud.bigtable.admin.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
@@ -114,15 +116,24 @@
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
+import io.grpc.internal.GrpcUtil;
import io.grpc.stub.StreamObserver;
+import io.opencensus.common.Scope;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.Tracing;
+import io.opencensus.trace.export.SpanData;
+import io.opencensus.trace.export.SpanExporter.Handler;
+import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.Base64;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -545,6 +556,61 @@ public void testUserAgent() throws InterruptedException {
.containsMatch("bigtable-java/\\d+\\.\\d+\\.\\d+(?:-SNAPSHOT)?");
}
+ @Test
+ public void testSpanAttributes() throws InterruptedException {
+ final BlockingQueue spans = new ArrayBlockingQueue<>(100);
+
+ // inject a temporary trace exporter
+ String handlerName = "stub-test-exporter";
+
+ Tracing.getExportComponent()
+ .getSpanExporter()
+ .registerHandler(
+ handlerName,
+ new Handler() {
+ @Override
+ public void export(Collection collection) {
+ spans.addAll(collection);
+ }
+ });
+
+ SpanData foundSpanData = null;
+ // Issue the rpc and grab the span
+ try {
+ try (Scope ignored =
+ Tracing.getTracer()
+ .spanBuilder("fake-parent-span")
+ .setSampler(Samplers.alwaysSample())
+ .startScopedSpan()) {
+ enhancedBigtableStub.readRowCallable().call(Query.create("table-id").rowKey("row-key"));
+ }
+
+ for (int i = 0; i < 100; i++) {
+ SpanData spanData = spans.poll(10, TimeUnit.SECONDS);
+ if ("Bigtable.ReadRow".equals(spanData.getName())) {
+ foundSpanData = spanData;
+ break;
+ }
+ }
+ } finally {
+ // cleanup
+ Tracing.getExportComponent().getSpanExporter().unregisterHandler(handlerName);
+ }
+
+ // Examine the caught span
+ assertThat(foundSpanData).isNotNull();
+ assertThat(foundSpanData.getAttributes().getAttributeMap())
+ .containsEntry("gapic", AttributeValue.stringAttributeValue(Version.VERSION));
+ assertThat(foundSpanData.getAttributes().getAttributeMap())
+ .containsEntry(
+ "grpc",
+ AttributeValue.stringAttributeValue(
+ GrpcUtil.getGrpcBuildVersion().getImplementationVersion()));
+ assertThat(foundSpanData.getAttributes().getAttributeMap())
+ .containsEntry(
+ "gax", AttributeValue.stringAttributeValue(GaxGrpcProperties.getGaxGrpcVersion()));
+ }
+
@Test
public void testBulkMutationFlowControllerConfigured() throws Exception {
BigtableDataSettings.Builder settings =
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java
new file mode 100644
index 0000000000..b0966a2166
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java
@@ -0,0 +1,473 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.api.gax.rpc.ClientContext;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase;
+import com.google.bigtable.v2.CheckAndMutateRowRequest;
+import com.google.bigtable.v2.CheckAndMutateRowResponse;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.MutateRowsResponse;
+import com.google.bigtable.v2.ReadModifyWriteRowRequest;
+import com.google.bigtable.v2.ReadModifyWriteRowResponse;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.bigtable.v2.SampleRowKeysResponse;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
+import com.google.cloud.bigtable.data.v2.internal.NameUtil;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
+import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
+import com.google.cloud.bigtable.data.v2.models.Mutation;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.models.SampleRowKeysRequest;
+import com.google.cloud.bigtable.data.v2.models.TableId;
+import com.google.cloud.bigtable.data.v2.stub.BigtableClientContext;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
+import com.google.common.collect.ImmutableMap;
+import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
+import io.grpc.Metadata;
+import io.grpc.Server;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import io.opencensus.stats.StatsComponent;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tags;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BigtableTracerCallableTest {
+ private Server server;
+ private Server serverNoHeader;
+
+ private FakeService fakeService = new FakeService();
+
+ private final StatsComponent localStats = new SimpleStatsComponent();
+ private EnhancedBigtableStub stub;
+ private EnhancedBigtableStub noHeaderStub;
+ private int attempts;
+
+ private static final String PROJECT_ID = "fake-project";
+ private static final String INSTANCE_ID = "fake-instance";
+ private static final String APP_PROFILE_ID = "default";
+ private static final String TABLE_ID = "fake-table";
+
+ private static final long WAIT_FOR_METRICS_TIME_MS = 1_000;
+
+ private AtomicInteger fakeServerTiming;
+
+ @Before
+ public void setUp() throws Exception {
+ RpcViews.registerBigtableClientGfeViews(localStats.getViewManager());
+
+ // Create a server that'll inject a server-timing header with a random number and a stub that
+ // connects to this server.
+ fakeServerTiming = new AtomicInteger(new Random().nextInt(1000) + 1);
+ server =
+ FakeServiceBuilder.create(fakeService)
+ .intercept(
+ new ServerInterceptor() {
+ @Override
+ public ServerCall.Listener interceptCall(
+ ServerCall serverCall,
+ Metadata metadata,
+ ServerCallHandler serverCallHandler) {
+ return serverCallHandler.startCall(
+ new SimpleForwardingServerCall(serverCall) {
+ @Override
+ public void sendHeaders(Metadata headers) {
+ headers.put(
+ Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER),
+ String.format("gfet4t7; dur=%d", fakeServerTiming.get()));
+ super.sendHeaders(headers);
+ }
+ },
+ metadata);
+ }
+ })
+ .start();
+
+ BigtableDataSettings settings =
+ BigtableDataSettings.newBuilderForEmulator(server.getPort())
+ .setProjectId(PROJECT_ID)
+ .setInstanceId(INSTANCE_ID)
+ .setAppProfileId(APP_PROFILE_ID)
+ .build();
+
+ BigtableClientContext bigtableClientContext =
+ EnhancedBigtableStub.createBigtableClientContext(settings.getStubSettings());
+ ClientContext clientContext =
+ bigtableClientContext.getClientContext().toBuilder()
+ .setTracerFactory(
+ EnhancedBigtableStub.createBigtableTracerFactory(
+ settings.getStubSettings(),
+ Tags.getTagger(),
+ localStats.getStatsRecorder(),
+ null))
+ .build();
+ attempts = settings.getStubSettings().readRowsSettings().getRetrySettings().getMaxAttempts();
+ stub = new EnhancedBigtableStub(settings.getStubSettings(), clientContext);
+
+ // Create another server without injecting the server-timing header and another stub that
+ // connects to it.
+ serverNoHeader = FakeServiceBuilder.create(fakeService).start();
+
+ BigtableDataSettings noHeaderSettings =
+ BigtableDataSettings.newBuilderForEmulator(serverNoHeader.getPort())
+ .setProjectId(PROJECT_ID)
+ .setInstanceId(INSTANCE_ID)
+ .setAppProfileId(APP_PROFILE_ID)
+ .build();
+
+ BigtableClientContext noHeaderBigtableClientContext =
+ EnhancedBigtableStub.createBigtableClientContext(noHeaderSettings.getStubSettings());
+ ClientContext noHeaderClientContext =
+ noHeaderBigtableClientContext.getClientContext().toBuilder()
+ .setTracerFactory(
+ EnhancedBigtableStub.createBigtableTracerFactory(
+ noHeaderSettings.getStubSettings(),
+ Tags.getTagger(),
+ localStats.getStatsRecorder(),
+ null))
+ .build();
+ noHeaderStub =
+ new EnhancedBigtableStub(noHeaderSettings.getStubSettings(), noHeaderClientContext);
+ }
+
+ @After
+ public void tearDown() {
+ stub.close();
+ noHeaderStub.close();
+ server.shutdown();
+ serverNoHeader.shutdown();
+ }
+
+ @Test
+ public void testGFELatencyMetricReadRows() {
+ ServerStream> call = stub.readRowsCallable().call(Query.create(TABLE_ID));
+ call.forEach(r -> {});
+
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencyMetricMutateRow() throws InterruptedException {
+ stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "fake-key"));
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencyMetricMutateRows() throws InterruptedException {
+ BulkMutation mutations =
+ BulkMutation.create(TABLE_ID)
+ .add("key", Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value"));
+ stub.bulkMutateRowsCallable().call(mutations);
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencySampleRowKeys() throws InterruptedException {
+ stub.sampleRowKeysCallable().call(TABLE_ID);
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.SampleRowKeys"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencySampleRowKeysWithRequest() throws InterruptedException {
+ stub.sampleRowKeysCallableWithRequest().call(SampleRowKeysRequest.create(TableId.of(TABLE_ID)));
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.SampleRowKeys"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencyCheckAndMutateRow() throws InterruptedException {
+ ConditionalRowMutation mutation =
+ ConditionalRowMutation.create(TABLE_ID, "fake-key")
+ .then(Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value"));
+ stub.checkAndMutateRowCallable().call(mutation);
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.CheckAndMutateRow"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencyReadModifyWriteRow() throws InterruptedException {
+ ReadModifyWriteRow request =
+ ReadModifyWriteRow.create(TABLE_ID, "fake-key")
+ .append("fake-family", "fake-qualifier", "suffix");
+ stub.readModifyWriteRowCallable().call(request);
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadModifyWriteRow"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFEMissingHeaderMetric() throws InterruptedException {
+ // Make a few calls to the server which will inject the server-timing header and the counter
+ // should be 0.
+ stub.readRowsCallable().call(Query.create(TABLE_ID));
+ stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "key"));
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long mutateRowMissingCount =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP,
+ TagValue.create("Bigtable.MutateRow"),
+ RpcMeasureConstants.BIGTABLE_STATUS,
+ TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ long readRowsMissingCount =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+
+ assertThat(mutateRowMissingCount).isEqualTo(0);
+ assertThat(readRowsMissingCount).isEqualTo(0);
+
+ // Make a few more calls to the server which won't add the header and the counter should match
+ // the number of requests sent.
+ int readRowsCalls = new Random().nextInt(10) + 1;
+ int mutateRowCalls = new Random().nextInt(10) + 1;
+ for (int i = 0; i < mutateRowCalls; i++) {
+ noHeaderStub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "fake-key" + i));
+ }
+ for (int i = 0; i < readRowsCalls; i++) {
+ noHeaderStub.readRowsCallable().call(Query.create(TABLE_ID));
+ }
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+
+ mutateRowMissingCount =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP,
+ TagValue.create("Bigtable.MutateRow"),
+ RpcMeasureConstants.BIGTABLE_STATUS,
+ TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ readRowsMissingCount =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP,
+ TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS,
+ TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ assertThat(mutateRowMissingCount).isEqualTo(mutateRowCalls);
+ assertThat(readRowsMissingCount).isEqualTo(readRowsCalls);
+ }
+
+ @Test
+ public void testMetricsWithErrorResponse() throws InterruptedException {
+ try {
+ stub.readRowsCallable().call(Query.create("random-table-id")).iterator().next();
+ fail("readrows should throw exception");
+ } catch (Exception e) {
+ assertThat(e).isInstanceOf(UnavailableException.class);
+ }
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long missingCount =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP,
+ TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS,
+ TagValue.create("UNAVAILABLE")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(missingCount).isEqualTo(attempts);
+ }
+
+ private class FakeService extends BigtableImplBase {
+ private final String defaultTableName =
+ NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID);
+
+ @Override
+ public void readRows(ReadRowsRequest request, StreamObserver observer) {
+ if (!request.getTableName().equals(defaultTableName)) {
+ observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ return;
+ }
+ observer.onNext(ReadRowsResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void mutateRow(MutateRowRequest request, StreamObserver observer) {
+ observer.onNext(MutateRowResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void mutateRows(MutateRowsRequest request, StreamObserver observer) {
+ MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
+ for (int i = 0; i < request.getEntriesCount(); i++) {
+ builder.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(i));
+ }
+ observer.onNext(builder.build());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void sampleRowKeys(
+ com.google.bigtable.v2.SampleRowKeysRequest request,
+ StreamObserver observer) {
+ observer.onNext(SampleRowKeysResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void checkAndMutateRow(
+ CheckAndMutateRowRequest request, StreamObserver observer) {
+ observer.onNext(CheckAndMutateRowResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void readModifyWriteRow(
+ ReadModifyWriteRowRequest request, StreamObserver observer) {
+ observer.onNext(ReadModifyWriteRowResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
new file mode 100644
index 0000000000..fa09ba2c11
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
@@ -0,0 +1,481 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.batching.Batcher;
+import com.google.api.gax.batching.BatcherImpl;
+import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.grpc.GrpcCallContext;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.ClientContext;
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.MutateRowsResponse;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.bigtable.v2.ReadRowsResponse.CellChunk;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.cloud.bigtable.data.v2.stub.BigtableClientContext;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
+import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.StringValue;
+import io.grpc.Server;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import io.opencensus.stats.StatsComponent;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tags;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.stubbing.Answer;
+
+@RunWith(JUnit4.class)
+public class MetricsTracerTest {
+ private static final String PROJECT_ID = "fake-project";
+ private static final String INSTANCE_ID = "fake-instance";
+ private static final String APP_PROFILE_ID = "default";
+ private static final String TABLE_ID = "fake-table";
+ private static final long SLEEP_VARIABILITY = 15;
+
+ private static final ReadRowsResponse DEFAULT_READ_ROWS_RESPONSES =
+ ReadRowsResponse.newBuilder()
+ .addChunks(
+ CellChunk.newBuilder()
+ .setRowKey(ByteString.copyFromUtf8("fake-key"))
+ .setFamilyName(StringValue.of("cf"))
+ .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")))
+ .setTimestampMicros(1_000)
+ .setValue(ByteString.copyFromUtf8("value"))
+ .setCommitRow(true))
+ .build();
+
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ private Server server;
+
+ @Mock(answer = Answers.CALLS_REAL_METHODS)
+ private BigtableGrpc.BigtableImplBase mockService;
+
+ private final StatsComponent localStats = new SimpleStatsComponent();
+ private EnhancedBigtableStub stub;
+ private BigtableDataSettings settings;
+
+ @Before
+ public void setUp() throws Exception {
+ server = FakeServiceBuilder.create(mockService).start();
+
+ RpcViews.registerBigtableClientViews(localStats.getViewManager());
+
+ settings =
+ BigtableDataSettings.newBuilderForEmulator(server.getPort())
+ .setProjectId(PROJECT_ID)
+ .setInstanceId(INSTANCE_ID)
+ .setAppProfileId(APP_PROFILE_ID)
+ .build();
+
+ BigtableClientContext bigtableClientContext =
+ EnhancedBigtableStub.createBigtableClientContext(settings.getStubSettings());
+ ClientContext clientContext =
+ bigtableClientContext.getClientContext().toBuilder()
+ .setTracerFactory(
+ EnhancedBigtableStub.createBigtableTracerFactory(
+ settings.getStubSettings(),
+ Tags.getTagger(),
+ localStats.getStatsRecorder(),
+ null))
+ .build();
+ stub = new EnhancedBigtableStub(settings.getStubSettings(), clientContext);
+ }
+
+ @After
+ public void tearDown() {
+ stub.close();
+ server.shutdown();
+ }
+
+ @Test
+ public void testReadRowsLatency() throws InterruptedException {
+ final long sleepTime = 50;
+
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ Thread.sleep(sleepTime);
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), any());
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ long opLatency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_OP_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(opLatency).isIn(Range.closed(sleepTime, elapsed));
+ }
+
+ @Test
+ public void testReadRowsOpCount() throws InterruptedException {
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), any());
+
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+
+ long opLatency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(opLatency).isEqualTo(2);
+ }
+
+ @Test
+ public void testReadRowsFirstRow() throws InterruptedException {
+ final long beforeSleep = 50;
+ final long afterSleep = 50;
+
+ SettableFuture gotFirstRow = SettableFuture.create();
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ doAnswer(
+ invocation -> {
+ StreamObserver observer = invocation.getArgument(1);
+ executor.submit(
+ () -> {
+ Thread.sleep(beforeSleep);
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ // wait until the first row is consumed before padding the operation span
+ gotFirstRow.get();
+ Thread.sleep(afterSleep);
+ observer.onCompleted();
+ return null;
+ });
+ return null;
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), any());
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ // Get the first row and notify the mock that it can start padding the operation span
+ Iterator it = stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator();
+ it.next();
+ gotFirstRow.set(null);
+ // finish the stream
+ while (it.hasNext()) {
+ it.next();
+ }
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ executor.shutdown();
+
+ long firstRowLatency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW,
+ ImmutableMap.of(),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ assertThat(firstRowLatency)
+ .isIn(
+ Range.closed(
+ beforeSleep - SLEEP_VARIABILITY, elapsed - afterSleep + SLEEP_VARIABILITY));
+ }
+
+ @Test
+ public void testReadRowsAttemptsPerOp() throws InterruptedException {
+ final AtomicInteger callCount = new AtomicInteger(0);
+
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+
+ // First call will trigger a transient error
+ if (callCount.getAndIncrement() == 0) {
+ observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ return null;
+ }
+
+ // Next attempt will return a row
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), any());
+
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+
+ long opLatency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(opLatency).isEqualTo(2);
+ }
+
+ @Test
+ public void testReadRowsAttemptLatency() throws InterruptedException {
+ final long sleepTime = 50;
+ final AtomicInteger callCount = new AtomicInteger(0);
+
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+
+ Thread.sleep(sleepTime);
+
+ // First attempt will return a transient error
+ if (callCount.getAndIncrement() == 0) {
+ observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ return null;
+ }
+ // Next attempt will be ok
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), any());
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ long attemptLatency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ // Average attempt latency will be just a single wait (as opposed to op latency which will be 2x
+ // sleeptime)
+ assertThat(attemptLatency).isIn(Range.closed(sleepTime, elapsed - sleepTime));
+ }
+
+ @Test
+ public void testInvalidRequest() {
+ try {
+ stub.bulkMutateRowsCallable().call(BulkMutation.create(TABLE_ID));
+ Assert.fail("Invalid request should throw exception");
+ } catch (IllegalStateException e) {
+ // Verify that the latency is recorded with an error code (in this case UNKNOWN)
+ long attemptLatency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("UNKNOWN")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(attemptLatency).isAtLeast(0);
+ }
+ }
+
+ @Test
+ public void testBatchReadRowsThrottledTime() throws Exception {
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), any());
+
+ try (Batcher batcher =
+ stub.newBulkReadRowsBatcher(Query.create(TABLE_ID), GrpcCallContext.createDefault())) {
+ batcher.add(ByteString.copyFromUtf8("row1"));
+ }
+
+ long throttledTimeMetric =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW,
+ ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(throttledTimeMetric).isEqualTo(0);
+ }
+
+ @Test
+ public void testBatchMutateRowsThrottledTime() throws Exception {
+ FlowController flowController = Mockito.mock(FlowController.class);
+ MutateRowsBatchingDescriptor batchingDescriptor = new MutateRowsBatchingDescriptor();
+
+ // Mock throttling
+ final long throttled = 50;
+ doAnswer(
+ invocation -> {
+ Thread.sleep(throttled);
+ return null;
+ })
+ .when(flowController)
+ .reserve(any(Long.class), any(Long.class));
+ when(flowController.getMaxElementCountLimit()).thenReturn(null);
+ when(flowController.getMaxRequestBytesLimit()).thenReturn(null);
+
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ MutateRowsRequest request = (MutateRowsRequest) invocation.getArguments()[0];
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
+ for (int i = 0; i < request.getEntriesCount(); i++) {
+ builder.addEntriesBuilder().setIndex(i);
+ }
+ observer.onNext(builder.build());
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .mutateRows(any(MutateRowsRequest.class), any());
+
+ ApiCallContext defaultContext = GrpcCallContext.createDefault();
+
+ try (Batcher batcher =
+ new BatcherImpl<>(
+ batchingDescriptor,
+ stub.internalBulkMutateRowsCallable().withDefaultCallContext(defaultContext),
+ BulkMutation.create(TABLE_ID),
+ settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(),
+ Executors.newSingleThreadScheduledExecutor(),
+ flowController,
+ defaultContext)) {
+
+ batcher.add(RowMutationEntry.create("key").deleteRow());
+ }
+
+ long throttledTimeMetric =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(throttledTimeMetric).isAtLeast(throttled);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static StreamObserver anyObserver(Class returnType) {
+ return (StreamObserver) any(returnType);
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/SimpleStatsComponent.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/SimpleStatsComponent.java
new file mode 100644
index 0000000000..99aed9c3b4
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/SimpleStatsComponent.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import io.opencensus.implcore.common.MillisClock;
+import io.opencensus.implcore.internal.SimpleEventQueue;
+import io.opencensus.implcore.stats.StatsComponentImplBase;
+
+/** A StatsComponent implementation for testing that executes all events inline. */
+public class SimpleStatsComponent extends StatsComponentImplBase {
+ public SimpleStatsComponent() {
+ super(new SimpleEventQueue(), MillisClock.getInstance());
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java
new file mode 100644
index 0000000000..e808af8a84
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java
@@ -0,0 +1,351 @@
+/*
+ * Copyright 2019 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import io.grpc.Context;
+import io.opencensus.common.Scope;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.Measure;
+import io.opencensus.stats.MeasureMap;
+import io.opencensus.stats.StatsComponent;
+import io.opencensus.stats.StatsRecorder;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.Tag;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagContextBuilder;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagMetadata;
+import io.opencensus.tags.TagMetadata.TagTtl;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tagger;
+import io.opencensus.tags.unsafe.ContextUtils;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+class StatsTestUtils {
+ private StatsTestUtils() {}
+
+ public static class MetricsRecord {
+ public final ImmutableMap tags;
+ public final ImmutableMap metrics;
+
+ private MetricsRecord(
+ ImmutableMap tags, ImmutableMap metrics) {
+ this.tags = tags;
+ this.metrics = metrics;
+ }
+
+ /** Returns the value of a metric, or {@code null} if not found. */
+ @Nullable
+ public Double getMetric(Measure measure) {
+ for (Map.Entry m : metrics.entrySet()) {
+ if (m.getKey().equals(measure)) {
+ Number value = m.getValue();
+ if (value instanceof Double) {
+ return (Double) value;
+ } else if (value instanceof Long) {
+ return (double) (Long) value;
+ }
+ throw new AssertionError("Unexpected measure value type: " + value.getClass().getName());
+ }
+ }
+ return null;
+ }
+
+ /** Returns the value of a metric converted to long, or throw if not found. */
+ public long getMetricAsLongOrFail(Measure measure) {
+ Double doubleValue = getMetric(measure);
+ checkNotNull(doubleValue, "Measure not found: %s", measure.getName());
+ long longValue = (long) (Math.abs(doubleValue) + 0.0001);
+ if (doubleValue < 0) {
+ longValue = -longValue;
+ }
+ return longValue;
+ }
+ }
+
+ /**
+ * A {@link Tagger} implementation that saves metrics records to be accessible from {@link
+ * #pollRecord()} and {@link #pollRecord(long, TimeUnit)}, until {@link #rolloverRecords} is
+ * called.
+ */
+ public static final class FakeStatsRecorder extends StatsRecorder {
+
+ private BlockingQueue records;
+
+ public FakeStatsRecorder() {
+ rolloverRecords();
+ }
+
+ @Override
+ public MeasureMap newMeasureMap() {
+ return new FakeStatsRecord(this);
+ }
+
+ public MetricsRecord pollRecord() {
+ return getCurrentRecordSink().poll();
+ }
+
+ public MetricsRecord pollRecord(long timeout, TimeUnit unit) throws InterruptedException {
+ return getCurrentRecordSink().poll(timeout, unit);
+ }
+
+ /**
+ * Disconnect this tagger with the contexts it has created so far. The records from those
+ * contexts will not show up in {@link #pollRecord}. Useful for isolating the records between
+ * test cases.
+ */
+ // This needs to be synchronized with getCurrentRecordSink() which may run concurrently.
+ public synchronized void rolloverRecords() {
+ records = new LinkedBlockingQueue<>();
+ }
+
+ private synchronized BlockingQueue getCurrentRecordSink() {
+ return records;
+ }
+ }
+
+ public static final class FakeTagger extends Tagger {
+
+ @Override
+ public FakeTagContext empty() {
+ return FakeTagContext.EMPTY;
+ }
+
+ @Override
+ public TagContext getCurrentTagContext() {
+ return ContextUtils.getValue(Context.current());
+ }
+
+ @Override
+ public TagContextBuilder emptyBuilder() {
+ return new FakeTagContextBuilder(ImmutableMap.of());
+ }
+
+ @Override
+ public FakeTagContextBuilder toBuilder(TagContext tags) {
+ return new FakeTagContextBuilder(getTags(tags));
+ }
+
+ @Override
+ public TagContextBuilder currentBuilder() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Scope withTagContext(TagContext tags) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static final class FakeStatsRecord extends MeasureMap {
+ private final BlockingQueue recordSink;
+ public final Map metrics = Maps.newHashMap();
+
+ private FakeStatsRecord(FakeStatsRecorder statsRecorder) {
+ this.recordSink = statsRecorder.getCurrentRecordSink();
+ }
+
+ @Override
+ public MeasureMap put(Measure.MeasureDouble measure, double value) {
+ metrics.put(measure, value);
+ return this;
+ }
+
+ @Override
+ public MeasureMap put(Measure.MeasureLong measure, long value) {
+ metrics.put(measure, value);
+ return this;
+ }
+
+ @Override
+ public void record(TagContext tags) {
+ recordSink.add(new MetricsRecord(getTags(tags), ImmutableMap.copyOf(metrics)));
+ }
+
+ @Override
+ public void record() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static final class FakeTagContext extends TagContext {
+ private static final FakeTagContext EMPTY =
+ new FakeTagContext(ImmutableMap.of());
+
+ private static final TagMetadata METADATA_PROPAGATING =
+ TagMetadata.create(TagTtl.UNLIMITED_PROPAGATION);
+
+ private final ImmutableMap tags;
+
+ private FakeTagContext(ImmutableMap tags) {
+ this.tags = tags;
+ }
+
+ public ImmutableMap getTags() {
+ return tags;
+ }
+
+ @Override
+ public String toString() {
+ return "[tags=" + tags + "]";
+ }
+
+ @Override
+ protected Iterator getIterator() {
+ return Iterators.transform(
+ tags.entrySet().iterator(),
+ new Function, Tag>() {
+ @Override
+ public Tag apply(@Nullable Map.Entry entry) {
+ return Tag.create(entry.getKey(), entry.getValue(), METADATA_PROPAGATING);
+ }
+ });
+ }
+ }
+
+ public static class FakeTagContextBuilder extends TagContextBuilder {
+
+ private final Map tagsBuilder = Maps.newHashMap();
+
+ private FakeTagContextBuilder(Map tags) {
+ tagsBuilder.putAll(tags);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public TagContextBuilder put(TagKey key, TagValue value) {
+ tagsBuilder.put(key, value);
+ return this;
+ }
+
+ @Override
+ public TagContextBuilder remove(TagKey key) {
+ tagsBuilder.remove(key);
+ return this;
+ }
+
+ @Override
+ public TagContext build() {
+ FakeTagContext context = new FakeTagContext(ImmutableMap.copyOf(tagsBuilder));
+ return context;
+ }
+
+ @Override
+ public Scope buildScoped() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ // This method handles the default TagContext, which isn't an instance of FakeTagContext.
+ private static ImmutableMap getTags(TagContext tags) {
+ return tags instanceof FakeTagContext
+ ? ((FakeTagContext) tags).getTags()
+ : ImmutableMap.of();
+ }
+
+ public static long getAggregationValueAsLong(
+ StatsComponent stats,
+ View view,
+ ImmutableMap tags,
+ String projectId,
+ String instanceId,
+ String appProfileId) {
+ ViewData viewData = stats.getViewManager().getView(view.getName());
+ Map, AggregationData> aggregationMap =
+ Objects.requireNonNull(viewData).getAggregationMap();
+
+ List tagValues = new ArrayList<>();
+
+ for (TagKey column : view.getColumns()) {
+ if (RpcMeasureConstants.BIGTABLE_PROJECT_ID == column) {
+ tagValues.add(TagValue.create(projectId));
+ } else if (RpcMeasureConstants.BIGTABLE_INSTANCE_ID == column) {
+ tagValues.add(TagValue.create(instanceId));
+ } else if (RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID == column) {
+ tagValues.add(TagValue.create(appProfileId));
+ } else {
+ tagValues.add(tags.get(column));
+ }
+ }
+
+ AggregationData aggregationData = aggregationMap.get(tagValues);
+
+ if (aggregationData == null) {
+ throw new RuntimeException(
+ "Failed to find metric for: " + tags + ". Current aggregation data: " + aggregationMap);
+ }
+
+ return aggregationData.match(
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.SumDataDouble arg) {
+ return (long) arg.getSum();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.SumDataLong arg) {
+ return arg.getSum();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.CountData arg) {
+ return arg.getCount();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.DistributionData arg) {
+ return (long) arg.getMean();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.LastValueDataDouble arg) {
+ return (long) arg.getLastValue();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.LastValueDataLong arg) {
+ return arg.getLastValue();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData arg) {
+ throw new UnsupportedOperationException();
+ }
+ });
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java
new file mode 100644
index 0000000000..3c0fb4e617
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2019 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.DeadlineExceededException;
+import com.google.common.util.concurrent.Futures;
+import io.grpc.Status;
+import io.opencensus.tags.TagValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class UtilTest {
+ @Test
+ public void testOk() {
+ TagValue tagValue = TagValue.create(Util.extractStatus((Throwable) null));
+ assertThat(tagValue.asString()).isEqualTo("OK");
+ }
+
+ @Test
+ public void testOkFuture() {
+ TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateFuture(null));
+ assertThat(tagValue.asString()).isEqualTo("OK");
+ }
+
+ @Test
+ public void testError() {
+ DeadlineExceededException error =
+ new DeadlineExceededException(
+ "Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true);
+ TagValue tagValue = TagValue.create(Util.extractStatus(error));
+ assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED");
+ }
+
+ @Test
+ public void testErrorFuture() {
+ DeadlineExceededException error =
+ new DeadlineExceededException(
+ "Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true);
+ TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateFailedFuture(error));
+ assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED");
+ }
+
+ @Test
+ public void testCancelledFuture() {
+ TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateCancelledFuture());
+ assertThat(tagValue.asString()).isEqualTo("CANCELLED");
+ }
+}