diff --git a/google-cloud-bigtable-deps-bom/pom.xml b/google-cloud-bigtable-deps-bom/pom.xml index 0cbc515758..7cbf424ed9 100644 --- a/google-cloud-bigtable-deps-bom/pom.xml +++ b/google-cloud-bigtable-deps-bom/pom.xml @@ -78,6 +78,12 @@ pom import + + + io.opencensus + opencensus-contrib-resource-util + 0.31.1 + diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index 8f6e571c20..a9734b96d4 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -382,23 +382,4 @@ * * - - - 7002 - com/google/cloud/bigtable/data/v2/BigtableDataSettings - *OpenCensusStats* - - - 8001 - com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory* - - - 8001 - com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants* - - - 8001 - com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews* - - diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index 4f3f4645e9..5bffc2ffbc 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -138,6 +138,10 @@ com.google.code.gson gson + + io.opencensus + opencensus-api + io.grpc grpc-alts @@ -180,6 +184,10 @@ io.grpc grpc-util + + io.grpc + grpc-core + io.grpc grpc-googleapis @@ -316,6 +324,11 @@ grpc-testing test + + io.opencensus + opencensus-impl + test + junit junit @@ -696,6 +709,20 @@ + + org.apache.maven.plugins + maven-dependency-plugin + + + + io.opencensus:opencensus-impl-core + + + + maven-failsafe-plugin diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index 0b6d948fe5..b8a514433f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -145,6 +145,62 @@ public static Builder newBuilderForEmulator(String hostname, int port) { return builder; } + /** + * @deprecated OpenCensus support is deprecated and will be removed in a future version + * Enables OpenCensus metric aggregations. + * + *

This will register Bigtable client relevant {@link io.opencensus.stats.View}s. When coupled + * with an exporter, it allows users to monitor client behavior. + * + *

Please note that in addition to calling this method, the application must: + *

    + *
  • Include openensus-impl dependency on the classpath + *
  • Configure an exporter like opencensus-exporter-stats-stackdriver + *
+ * + *

Example usage for maven: + *

{@code
+   *   
+   *     io.opencensus
+   *     opencensus-impl
+   *     ${opencensus.version}
+   *     runtime
+   *   
+   *
+   *   
+   *     io.opencensus
+   *     opencensus-exporter-stats-stackdriver
+   *     ${opencensus.version}
+   *   
+   * 
+ * + * Java: + *
{@code
+   *   StackdriverStatsExporter.createAndRegister();
+   *   BigtableDataSettings.enableOpenCensusStats();
+   * }
+ */ + @Deprecated + public static void enableOpenCensusStats() { + com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientViews(); + // TODO(igorbernstein): Enable grpc views once we upgrade to grpc-java 1.24.0 + // Required change: https://github.com/grpc/grpc-java/pull/5996 + // io.opencensus.contrib.grpc.metrics.RpcViews.registerClientGrpcBasicViews(); + } + + /** + * @deprecated OpenCensus support is deprecated and will be removed in a future version Enables + * OpenCensus GFE metric aggregations. + *

This will register views for gfe_latency and gfe_header_missing_count metrics. + *

gfe_latency measures the latency between Google's network receives an RPC and reads back + * the first byte of the response. gfe_header_missing_count is a counter of the number of RPC + * responses received without the server-timing header. + */ + @Deprecated + public static void enableGfeOpenCensusStats() { + com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews(); + } + /** * Register built in metrics. * diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 42b46ab3b5..5f6b69dea8 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -28,6 +28,7 @@ import com.google.api.gax.batching.BatcherImpl; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcCallSettings; import com.google.api.gax.grpc.GrpcRawCallableFactory; @@ -47,6 +48,7 @@ import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.OpencensusTracerFactory; import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.api.gax.tracing.TracedUnaryCallable; @@ -98,6 +100,8 @@ import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory; import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory; +import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory; +import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants; import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable; @@ -122,6 +126,7 @@ import com.google.cloud.bigtable.data.v2.stub.sql.SqlRowMergingCallable; import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Functions; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -130,6 +135,12 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import io.grpc.MethodDescriptor; +import io.opencensus.stats.Stats; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.Attributes; import java.io.IOException; @@ -215,11 +226,47 @@ public static BigtableClientContext createBigtableClientContext( public static ApiTracerFactory createBigtableTracerFactory( EnhancedBigtableStubSettings settings, @Nullable OpenTelemetry openTelemetry) throws IOException { + return createBigtableTracerFactory( + settings, Tags.getTagger(), Stats.getStatsRecorder(), openTelemetry); + } - ImmutableList.Builder tracerFactories = ImmutableList.builder(); - - tracerFactories.add(settings.getTracerFactory()); + @VisibleForTesting + public static ApiTracerFactory createBigtableTracerFactory( + EnhancedBigtableStubSettings settings, + Tagger tagger, + StatsRecorder stats, + @Nullable OpenTelemetry openTelemetry) + throws IOException { + String projectId = settings.getProjectId(); + String instanceId = settings.getInstanceId(); + String appProfileId = settings.getAppProfileId(); + + ImmutableMap attributes = + ImmutableMap.builder() + .put(RpcMeasureConstants.BIGTABLE_PROJECT_ID, TagValue.create(projectId)) + .put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(instanceId)) + .put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, TagValue.create(appProfileId)) + .build(); + ImmutableList.Builder tracerFactories = ImmutableList.builder(); + tracerFactories + .add( + // Add OpenCensus Tracing + new OpencensusTracerFactory( + ImmutableMap.builder() + // Annotate traces with the same tags as metrics + .put(RpcMeasureConstants.BIGTABLE_PROJECT_ID.getName(), projectId) + .put(RpcMeasureConstants.BIGTABLE_INSTANCE_ID.getName(), instanceId) + .put(RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID.getName(), appProfileId) + // Also annotate traces with library versions + .put("gax", GaxGrpcProperties.getGaxGrpcVersion()) + .put("grpc", GaxGrpcProperties.getGrpcVersion()) + .put("gapic", Version.VERSION) + .build())) + // Add OpenCensus Metrics + .add(MetricsTracerFactory.create(tagger, stats, attributes)) + // Add user configured tracer + .add(settings.getTracerFactory()); BuiltinMetricsTracerFactory builtinMetricsTracerFactory = openTelemetry != null ? BuiltinMetricsTracerFactory.create(openTelemetry, createBuiltinAttributes(settings)) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java index f8f84e651a..13b832b8b1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java @@ -31,9 +31,9 @@ * This callable will *

  • -Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC methods upon * completion. The {@link BigtableTracer} will process metrics that were injected in the - * header/trailer and publish them. If {@link GrpcResponseMetadata#getMetadata()} returned null, - * it probably means that the request has never reached GFE, and it'll increment the - * gfe_header_missing_counter in this case. + * header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()} + * returned null, it probably means that the request has never reached GFE, and it'll increment + * the gfe_header_missing_counter in this case. *
  • -This class will also access trailers from {@link GrpcResponseMetadata} to record zone and * cluster ids. *
  • -Call {@link BigtableTracer#onRequest(int)} to record the request events in a stream. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java index 8bf31dfc9d..37ba74bfdb 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java @@ -30,9 +30,9 @@ * This callable will: *
  • - Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC methods upon * completion. The {@link BigtableTracer} will process metrics that were injected in the - * header/trailer and publish them. If {@link GrpcResponseMetadata#getMetadata()} returned null, - * it probably means that the request has never reached GFE, and it'll increment the - * gfe_header_missing_counter in this case. + * header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()} + * returned null, it probably means that the request has never reached GFE, and it'll increment + * the gfe_header_missing_counter in this case. *
  • -This class will also access trailers from {@link GrpcResponseMetadata} to record zone and * cluster ids. *
  • -This class will also inject a {@link BigtableGrpcStreamTracer} that'll record the time an diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index c9f33c950b..1f95224185 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -122,8 +122,8 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend private TransportAttrs transportAttrs = null; - // Server histogram buckets use [start, end), however OpenTelemetry uses (start, end]. To work - // around this, we measure all the latencies in nanoseconds and convert them + // OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start, + // end]. To work around this, we measure all the latencies in nanoseconds and convert them // to milliseconds and use DoubleHistogram. This should minimize the chance of a data // point fall on the bucket boundary that causes off by one errors. private final DoubleHistogram operationLatenciesHistogram; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java new file mode 100644 index 0000000000..c322b75df8 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -0,0 +1,263 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeDuration; + +import com.google.api.core.ObsoleteApi; +import com.google.api.gax.retrying.ServerStreamingAttemptException; +import com.google.api.gax.tracing.ApiTracerFactory.OperationType; +import com.google.api.gax.tracing.SpanName; +import com.google.common.base.Stopwatch; +import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; + +class MetricsTracer extends BigtableTracer { + + private final OperationType operationType; + + private final Tagger tagger; + private final StatsRecorder stats; + + // Tags + private final TagContext parentContext; + private final SpanName spanName; + private final Map statsAttributes; + + // Operation level metrics + private final AtomicBoolean opFinished = new AtomicBoolean(); + private final Stopwatch operationTimer = Stopwatch.createStarted(); + private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted(); + private long operationResponseCount = 0; + + // Attempt level metrics + private int attemptCount = 0; + private Stopwatch attemptTimer; + private long attemptResponseCount = 0; + + private volatile int attempt = 0; + + private volatile boolean reportBatchingLatency = false; + private volatile long batchThrottledLatency = 0; + + MetricsTracer( + OperationType operationType, + Tagger tagger, + StatsRecorder stats, + SpanName spanName, + Map statsAttributes) { + this.operationType = operationType; + this.tagger = tagger; + this.stats = stats; + this.parentContext = tagger.getCurrentTagContext(); + this.spanName = spanName; + this.statsAttributes = statsAttributes; + } + + @Override + public Scope inScope() { + return new Scope() { + @Override + public void close() {} + }; + } + + @Override + public void operationFinishEarly() { + attemptTimer.stop(); + operationTimer.stop(); + } + + @Override + public void operationSucceeded() { + recordOperationCompletion(null); + } + + @Override + public void operationCancelled() { + recordOperationCompletion(new CancellationException()); + } + + @Override + public void operationFailed(Throwable throwable) { + recordOperationCompletion(throwable); + } + + private void recordOperationCompletion(@Nullable Throwable throwable) { + if (!opFinished.compareAndSet(false, true)) { + return; + } + + long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS); + + MeasureMap measures = + stats + .newMeasureMap() + .put(RpcMeasureConstants.BIGTABLE_OP_LATENCY, elapsed) + .put(RpcMeasureConstants.BIGTABLE_OP_ATTEMPT_COUNT, attemptCount); + + if (operationType == OperationType.ServerStreaming + && spanName.getMethodName().equals("ReadRows")) { + measures.put( + RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, + firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS)); + } + + TagContextBuilder tagCtx = + newTagCtxBuilder() + .putLocal( + RpcMeasureConstants.BIGTABLE_STATUS, + TagValue.create(Util.extractStatus(throwable))); + + measures.record(tagCtx.build()); + } + + @Override + public void attemptStarted(int attemptNumber) { + attempt = attemptNumber; + attemptCount++; + attemptTimer = Stopwatch.createStarted(); + attemptResponseCount = 0; + } + + @Override + public void attemptSucceeded() { + recordAttemptCompletion(null); + } + + @Override + public void attemptCancelled() { + recordAttemptCompletion(new CancellationException()); + } + + /** + * This method is obsolete. Use {@link #attemptFailedDuration(Throwable, java.time.Duration)} + * instead. + */ + @ObsoleteApi("Use attemptFailedDuration(Throwable, java.time.Duration) instead") + @Override + public void attemptFailed(Throwable error, org.threeten.bp.Duration delay) { + attemptFailedDuration(error, toJavaTimeDuration(delay)); + } + + @Override + public void attemptFailedDuration(Throwable throwable, java.time.Duration duration) { + recordAttemptCompletion(throwable); + } + + @Override + public void attemptFailedRetriesExhausted(Throwable throwable) { + recordAttemptCompletion(throwable); + } + + @Override + public void attemptPermanentFailure(Throwable throwable) { + recordAttemptCompletion(throwable); + } + + private void recordAttemptCompletion(@Nullable Throwable throwable) { + MeasureMap measures = + stats + .newMeasureMap() + .put( + RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY, + attemptTimer.elapsed(TimeUnit.MILLISECONDS)); + + if (reportBatchingLatency) { + measures.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, batchThrottledLatency); + + // Reset batch throttling latency for next attempt. This can't be done in attemptStarted + // because batching flow control will add batching latency before the attempt has started. + batchThrottledLatency = 0; + } + + // Patch the throwable until it's fixed in gax. When an attempt failed, + // it'll throw a ServerStreamingAttemptException. Unwrap the exception + // so it could get processed by extractStatus + if (throwable instanceof ServerStreamingAttemptException) { + throwable = throwable.getCause(); + } + + TagContextBuilder tagCtx = + newTagCtxBuilder() + .putLocal( + RpcMeasureConstants.BIGTABLE_STATUS, + TagValue.create(Util.extractStatus(throwable))); + + measures.record(tagCtx.build()); + } + + @Override + public void responseReceived() { + if (firstResponsePerOpTimer.isRunning()) { + firstResponsePerOpTimer.stop(); + } + attemptResponseCount++; + operationResponseCount++; + } + + @Override + public int getAttempt() { + return attempt; + } + + @Override + public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) { + MeasureMap measures = stats.newMeasureMap(); + if (latency != null) { + measures + .put(RpcMeasureConstants.BIGTABLE_GFE_LATENCY, latency) + .put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 0L); + } else { + measures.put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 1L); + } + measures.record( + newTagCtxBuilder() + .putLocal( + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create(Util.extractStatus(throwable))) + .build()); + } + + @Override + public void batchRequestThrottled(long totalThrottledMs) { + reportBatchingLatency = true; + batchThrottledLatency += totalThrottledMs; + } + + private TagContextBuilder newTagCtxBuilder() { + TagContextBuilder tagCtx = + tagger.toBuilder(parentContext) + .putLocal(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(spanName.toString())); + + // Copy client level tags in + for (Entry entry : statsAttributes.entrySet()) { + tagCtx.putLocal(entry.getKey(), entry.getValue()); + } + + return tagCtx; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java new file mode 100644 index 0000000000..e0c173a2be --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java @@ -0,0 +1,55 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.InternalApi; +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.BaseApiTracerFactory; +import com.google.api.gax.tracing.SpanName; +import com.google.common.collect.ImmutableMap; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; + +/** + * {@link ApiTracerFactory} that will generate OpenCensus metrics by using the {@link ApiTracer} + * api. + */ +@InternalApi("For internal use only") +public class MetricsTracerFactory extends BaseApiTracerFactory { + private final Tagger tagger; + private final StatsRecorder stats; + private final ImmutableMap statsAttributes; + + public static MetricsTracerFactory create( + Tagger tagger, StatsRecorder stats, ImmutableMap statsAttributes) { + return new MetricsTracerFactory(tagger, stats, statsAttributes); + } + + private MetricsTracerFactory( + Tagger tagger, StatsRecorder stats, ImmutableMap statsAttributes) { + this.tagger = tagger; + this.stats = stats; + this.statsAttributes = statsAttributes; + } + + @Override + public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) { + return new MetricsTracer(operationType, tagger, stats, spanName, statsAttributes); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java new file mode 100644 index 0000000000..560bb084bf --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java @@ -0,0 +1,100 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.InternalApi; +import io.opencensus.stats.Measure.MeasureLong; +import io.opencensus.tags.TagKey; + +@InternalApi("For internal use only") +public class RpcMeasureConstants { + // TagKeys + public static final TagKey BIGTABLE_PROJECT_ID = TagKey.create("bigtable_project_id"); + public static final TagKey BIGTABLE_INSTANCE_ID = TagKey.create("bigtable_instance_id"); + public static final TagKey BIGTABLE_APP_PROFILE_ID = TagKey.create("bigtable_app_profile_id"); + + /** Tag key that represents a Bigtable operation name. */ + static final TagKey BIGTABLE_OP = TagKey.create("bigtable_op"); + + /** Tag key that represents the final status of the Bigtable operation. */ + static final TagKey BIGTABLE_STATUS = TagKey.create("bigtable_status"); + + // Units + /** Unit to represent counts. */ + private static final String COUNT = "1"; + + /** Unit to represent milliseconds. */ + private static final String MILLISECOND = "ms"; + + // Measurements + /** + * Latency for a logic operation, which will include latencies for each attempt and exponential + * backoff delays. + */ + static final MeasureLong BIGTABLE_OP_LATENCY = + MeasureLong.create( + "cloud.google.com/java/bigtable/op_latency", + "Time between request being sent to last row received, " + + "or terminal error of the last retry attempt.", + MILLISECOND); + + /** + * Number of attempts a logical operation took to complete. Under normal circumstances should be + * 1. + */ + static final MeasureLong BIGTABLE_OP_ATTEMPT_COUNT = + MeasureLong.MeasureLong.create( + "cloud.google.com/java/bigtable/op_attempt_count", + "Number of attempts per operation", + COUNT); + + /** Latency that a single attempt (RPC) took to complete. */ + static final MeasureLong BIGTABLE_ATTEMPT_LATENCY = + MeasureLong.create( + "cloud.google.com/java/bigtable/attempt_latency", + "Duration of an individual operation attempt", + MILLISECOND); + + /** Latency for the caller to see the first row in a ReadRows stream. */ + static final MeasureLong BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY = + MeasureLong.create( + "cloud.google.com/java/bigtable/read_rows_first_row_latency", + "Time between request being sent to the first row received", + MILLISECOND); + + /** GFE t4t7 latency extracted from server-timing header. */ + public static final MeasureLong BIGTABLE_GFE_LATENCY = + MeasureLong.create( + "cloud.google.com/java/bigtable/gfe_latency", + "Latency between Google's network receives an RPC and reads back the first byte of the" + + " response", + MILLISECOND); + + /** Number of responses without the server-timing header. */ + public static final MeasureLong BIGTABLE_GFE_HEADER_MISSING_COUNT = + MeasureLong.create( + "cloud.google.com/java/bigtable/gfe_header_missing_count", + "Number of RPC responses received without the server-timing header, most likely means" + + " that the RPC never reached Google's network", + COUNT); + + /** Total throttled time of a batch in msecs. */ + public static final MeasureLong BIGTABLE_BATCH_THROTTLED_TIME = + MeasureLong.create( + "cloud.google.com/java/bigtable/batch_throttled_time", + "Total throttled time of a batch in msecs", + MILLISECOND); +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java new file mode 100644 index 0000000000..4e21eaf785 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java @@ -0,0 +1,170 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP_ATTEMPT_COUNT; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP_LATENCY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_PROJECT_ID; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_STATUS; + +import com.google.common.collect.ImmutableList; +import io.opencensus.stats.Aggregation; +import io.opencensus.stats.Aggregation.Count; +import io.opencensus.stats.Aggregation.Distribution; +import io.opencensus.stats.Aggregation.Sum; +import io.opencensus.stats.BucketBoundaries; +import io.opencensus.stats.View; +import java.util.Arrays; + +class RpcViewConstants { + // Aggregations + private static final Aggregation COUNT = Count.create(); + private static final Aggregation SUM = Sum.create(); + + private static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM = + Distribution.create( + BucketBoundaries.create( + ImmutableList.of( + 0.0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, + 13.0, 16.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, + 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, + 20000.0, 50000.0, 100000.0))); + + private static final Aggregation AGGREGATION_ATTEMPT_COUNT = + Distribution.create( + BucketBoundaries.create( + ImmutableList.of( + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 15.0, 20.0, 30.0, 40.0, 50.0, + 100.0))); + + private static final Aggregation AGGREGATION_WITH_POWERS_OF_2 = + Distribution.create( + BucketBoundaries.create( + ImmutableList.of( + 0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0, 2048.0, + 4096.0, 8192.0, 16384.0, 32768.0, 65536.0, 131072.0, 262144.0, 524288.0, + 1048576.0, 2097152.0))); + + /** + * {@link View} for Bigtable client roundtrip latency in milliseconds including all retry + * attempts. + */ + static final View BIGTABLE_OP_LATENCY_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/op_latency"), + "Operation latency in msecs", + BIGTABLE_OP_LATENCY, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of( + BIGTABLE_PROJECT_ID, + BIGTABLE_INSTANCE_ID, + BIGTABLE_APP_PROFILE_ID, + BIGTABLE_OP, + BIGTABLE_STATUS)); + + static final View BIGTABLE_COMPLETED_OP_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/completed_ops"), + "Number of completed Bigtable client operations", + BIGTABLE_OP_LATENCY, + COUNT, + Arrays.asList( + BIGTABLE_PROJECT_ID, + BIGTABLE_INSTANCE_ID, + BIGTABLE_APP_PROFILE_ID, + BIGTABLE_OP, + BIGTABLE_STATUS)); + + static final View BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/read_rows_first_row_latency"), + "Latency to receive the first row in a ReadRows stream", + BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of(BIGTABLE_PROJECT_ID, BIGTABLE_INSTANCE_ID, BIGTABLE_APP_PROFILE_ID)); + + static final View BIGTABLE_ATTEMPT_LATENCY_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/attempt_latency"), + "Attempt latency in msecs", + BIGTABLE_ATTEMPT_LATENCY, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of( + BIGTABLE_PROJECT_ID, + BIGTABLE_INSTANCE_ID, + BIGTABLE_APP_PROFILE_ID, + BIGTABLE_OP, + BIGTABLE_STATUS)); + + static final View BIGTABLE_ATTEMPTS_PER_OP_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/attempts_per_op"), + "Distribution of attempts per logical operation", + BIGTABLE_OP_ATTEMPT_COUNT, + AGGREGATION_ATTEMPT_COUNT, + ImmutableList.of( + BIGTABLE_PROJECT_ID, + BIGTABLE_INSTANCE_ID, + BIGTABLE_APP_PROFILE_ID, + BIGTABLE_OP, + BIGTABLE_STATUS)); + + static final View BIGTABLE_GFE_LATENCY_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/gfe_latency"), + "Latency between Google's network receives an RPC and reads back the first byte of the" + + " response", + BIGTABLE_GFE_LATENCY, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of( + BIGTABLE_INSTANCE_ID, + BIGTABLE_PROJECT_ID, + BIGTABLE_APP_PROFILE_ID, + BIGTABLE_OP, + BIGTABLE_STATUS)); + + static final View BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/gfe_header_missing_count"), + "Number of RPC responses received without the server-timing header, most likely means" + + " that the RPC never reached Google's network", + BIGTABLE_GFE_HEADER_MISSING_COUNT, + SUM, + ImmutableList.of( + BIGTABLE_INSTANCE_ID, + BIGTABLE_PROJECT_ID, + BIGTABLE_APP_PROFILE_ID, + BIGTABLE_OP, + BIGTABLE_STATUS)); + + // use distribution so we can correlate batch throttled time with op_latency + static final View BIGTABLE_BATCH_THROTTLED_TIME_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/batch_throttled_time"), + "Total throttled time of a batch in msecs", + BIGTABLE_BATCH_THROTTLED_TIME, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of( + BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP)); +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java new file mode 100644 index 0000000000..e8902108aa --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java @@ -0,0 +1,81 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import io.opencensus.stats.Stats; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewManager; + +@Deprecated +public class RpcViews { + @VisibleForTesting + private static final ImmutableSet BIGTABLE_CLIENT_VIEWS_SET = + ImmutableSet.of( + RpcViewConstants.BIGTABLE_OP_LATENCY_VIEW, + RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW, + RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW, + RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW, + RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW); + + private static final ImmutableSet GFE_VIEW_SET = + ImmutableSet.of( + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW); + + private static boolean gfeMetricsRegistered = false; + + /** Registers all Bigtable specific views. */ + public static void registerBigtableClientViews() { + registerBigtableClientViews(Stats.getViewManager()); + } + + /** + * Register views for GFE metrics, including gfe_latency and gfe_header_missing_count. gfe_latency + * measures the latency between Google's network receives an RPC and reads back the first byte of + * the response. gfe_header_missing_count is a counter of the number of RPC responses without a + * server-timing header. + */ + public static void registerBigtableClientGfeViews() { + registerBigtableClientGfeViews(Stats.getViewManager()); + } + + @VisibleForTesting + static void registerBigtableClientViews(ViewManager viewManager) { + for (View view : BIGTABLE_CLIENT_VIEWS_SET) { + viewManager.registerView(view); + } + } + + @VisibleForTesting + static void registerBigtableClientGfeViews(ViewManager viewManager) { + for (View view : GFE_VIEW_SET) { + viewManager.registerView(view); + } + gfeMetricsRegistered = true; + } + + static boolean isGfeMetricsRegistered() { + return gfeMetricsRegistered; + } + + @VisibleForTesting + static void setGfeMetricsRegistered(boolean gfeMetricsRegistered) { + RpcViews.gfeMetricsRegistered = gfeMetricsRegistered; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java index b0b40054d5..093d7800d3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/Util.java @@ -43,6 +43,7 @@ import io.grpc.Status; import io.grpc.StatusException; import io.grpc.StatusRuntimeException; +import io.opencensus.tags.TagValue; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.metrics.InstrumentSelector; import io.opentelemetry.sdk.metrics.SdkMeterProvider; @@ -56,11 +57,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; -/** Utilities to help with metrics. */ +/** Utilities to help integrating with OpenCensus. */ @InternalApi("For internal use only") public class Util { static final Metadata.Key ATTEMPT_HEADER_KEY = @@ -74,7 +77,7 @@ public class Util { static final Metadata.Key LOCATION_METADATA_KEY = Metadata.Key.of("x-goog-ext-425905942-bin", Metadata.BINARY_BYTE_MARSHALLER); - /** Convert an exception into a string. */ + /** Convert an exception into a value that can be used to create an OpenCensus tag value. */ static String extractStatus(@Nullable Throwable error) { final String statusString; @@ -95,6 +98,26 @@ static String extractStatus(@Nullable Throwable error) { return statusString; } + /** + * Await the result of the future and convert it into a value that can be used as an OpenCensus + * tag value. + */ + static TagValue extractStatusFromFuture(Future future) { + Throwable error = null; + + try { + future.get(); + } catch (InterruptedException e) { + error = e; + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + error = e.getCause(); + } catch (RuntimeException e) { + error = e; + } + return TagValue.create(extractStatus(error)); + } + static String extractTableId(Object request) { String tableName = null; String authorizedViewName = null; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index c617274cae..e2bbc1bedb 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -36,6 +36,7 @@ import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FailedPreconditionException; @@ -66,6 +67,7 @@ import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.RowSet; +import com.google.cloud.bigtable.Version; import com.google.cloud.bigtable.admin.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; @@ -114,15 +116,24 @@ import io.grpc.ServerCall.Listener; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; +import io.grpc.internal.GrpcUtil; import io.grpc.stub.StreamObserver; +import io.opencensus.common.Scope; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Tracing; +import io.opencensus.trace.export.SpanData; +import io.opencensus.trace.export.SpanExporter.Handler; +import io.opencensus.trace.samplers.Samplers; import java.io.IOException; import java.security.KeyPair; import java.security.KeyPairGenerator; import java.security.NoSuchAlgorithmException; import java.time.Instant; import java.util.Base64; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -545,6 +556,61 @@ public void testUserAgent() throws InterruptedException { .containsMatch("bigtable-java/\\d+\\.\\d+\\.\\d+(?:-SNAPSHOT)?"); } + @Test + public void testSpanAttributes() throws InterruptedException { + final BlockingQueue spans = new ArrayBlockingQueue<>(100); + + // inject a temporary trace exporter + String handlerName = "stub-test-exporter"; + + Tracing.getExportComponent() + .getSpanExporter() + .registerHandler( + handlerName, + new Handler() { + @Override + public void export(Collection collection) { + spans.addAll(collection); + } + }); + + SpanData foundSpanData = null; + // Issue the rpc and grab the span + try { + try (Scope ignored = + Tracing.getTracer() + .spanBuilder("fake-parent-span") + .setSampler(Samplers.alwaysSample()) + .startScopedSpan()) { + enhancedBigtableStub.readRowCallable().call(Query.create("table-id").rowKey("row-key")); + } + + for (int i = 0; i < 100; i++) { + SpanData spanData = spans.poll(10, TimeUnit.SECONDS); + if ("Bigtable.ReadRow".equals(spanData.getName())) { + foundSpanData = spanData; + break; + } + } + } finally { + // cleanup + Tracing.getExportComponent().getSpanExporter().unregisterHandler(handlerName); + } + + // Examine the caught span + assertThat(foundSpanData).isNotNull(); + assertThat(foundSpanData.getAttributes().getAttributeMap()) + .containsEntry("gapic", AttributeValue.stringAttributeValue(Version.VERSION)); + assertThat(foundSpanData.getAttributes().getAttributeMap()) + .containsEntry( + "grpc", + AttributeValue.stringAttributeValue( + GrpcUtil.getGrpcBuildVersion().getImplementationVersion())); + assertThat(foundSpanData.getAttributes().getAttributeMap()) + .containsEntry( + "gax", AttributeValue.stringAttributeValue(GaxGrpcProperties.getGaxGrpcVersion())); + } + @Test public void testBulkMutationFlowControllerConfigured() throws Exception { BigtableDataSettings.Builder settings = diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java new file mode 100644 index 0000000000..b0966a2166 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java @@ -0,0 +1,473 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.UnavailableException; +import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase; +import com.google.bigtable.v2.CheckAndMutateRowRequest; +import com.google.bigtable.v2.CheckAndMutateRowResponse; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.ReadModifyWriteRowRequest; +import com.google.bigtable.v2.ReadModifyWriteRowResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.SampleRowKeysResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; +import com.google.cloud.bigtable.data.v2.internal.NameUtil; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; +import com.google.cloud.bigtable.data.v2.models.Mutation; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.SampleRowKeysRequest; +import com.google.cloud.bigtable.data.v2.models.TableId; +import com.google.cloud.bigtable.data.v2.stub.BigtableClientContext; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; +import com.google.common.collect.ImmutableMap; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import io.opencensus.stats.StatsComponent; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tags; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BigtableTracerCallableTest { + private Server server; + private Server serverNoHeader; + + private FakeService fakeService = new FakeService(); + + private final StatsComponent localStats = new SimpleStatsComponent(); + private EnhancedBigtableStub stub; + private EnhancedBigtableStub noHeaderStub; + private int attempts; + + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String APP_PROFILE_ID = "default"; + private static final String TABLE_ID = "fake-table"; + + private static final long WAIT_FOR_METRICS_TIME_MS = 1_000; + + private AtomicInteger fakeServerTiming; + + @Before + public void setUp() throws Exception { + RpcViews.registerBigtableClientGfeViews(localStats.getViewManager()); + + // Create a server that'll inject a server-timing header with a random number and a stub that + // connects to this server. + fakeServerTiming = new AtomicInteger(new Random().nextInt(1000) + 1); + server = + FakeServiceBuilder.create(fakeService) + .intercept( + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + return serverCallHandler.startCall( + new SimpleForwardingServerCall(serverCall) { + @Override + public void sendHeaders(Metadata headers) { + headers.put( + Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER), + String.format("gfet4t7; dur=%d", fakeServerTiming.get())); + super.sendHeaders(headers); + } + }, + metadata); + } + }) + .start(); + + BigtableDataSettings settings = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setAppProfileId(APP_PROFILE_ID) + .build(); + + BigtableClientContext bigtableClientContext = + EnhancedBigtableStub.createBigtableClientContext(settings.getStubSettings()); + ClientContext clientContext = + bigtableClientContext.getClientContext().toBuilder() + .setTracerFactory( + EnhancedBigtableStub.createBigtableTracerFactory( + settings.getStubSettings(), + Tags.getTagger(), + localStats.getStatsRecorder(), + null)) + .build(); + attempts = settings.getStubSettings().readRowsSettings().getRetrySettings().getMaxAttempts(); + stub = new EnhancedBigtableStub(settings.getStubSettings(), clientContext); + + // Create another server without injecting the server-timing header and another stub that + // connects to it. + serverNoHeader = FakeServiceBuilder.create(fakeService).start(); + + BigtableDataSettings noHeaderSettings = + BigtableDataSettings.newBuilderForEmulator(serverNoHeader.getPort()) + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setAppProfileId(APP_PROFILE_ID) + .build(); + + BigtableClientContext noHeaderBigtableClientContext = + EnhancedBigtableStub.createBigtableClientContext(noHeaderSettings.getStubSettings()); + ClientContext noHeaderClientContext = + noHeaderBigtableClientContext.getClientContext().toBuilder() + .setTracerFactory( + EnhancedBigtableStub.createBigtableTracerFactory( + noHeaderSettings.getStubSettings(), + Tags.getTagger(), + localStats.getStatsRecorder(), + null)) + .build(); + noHeaderStub = + new EnhancedBigtableStub(noHeaderSettings.getStubSettings(), noHeaderClientContext); + } + + @After + public void tearDown() { + stub.close(); + noHeaderStub.close(); + server.shutdown(); + serverNoHeader.shutdown(); + } + + @Test + public void testGFELatencyMetricReadRows() { + ServerStream call = stub.readRowsCallable().call(Query.create(TABLE_ID)); + call.forEach(r -> {}); + + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencyMetricMutateRow() throws InterruptedException { + stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "fake-key")); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencyMetricMutateRows() throws InterruptedException { + BulkMutation mutations = + BulkMutation.create(TABLE_ID) + .add("key", Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value")); + stub.bulkMutateRowsCallable().call(mutations); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencySampleRowKeys() throws InterruptedException { + stub.sampleRowKeysCallable().call(TABLE_ID); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.SampleRowKeys"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencySampleRowKeysWithRequest() throws InterruptedException { + stub.sampleRowKeysCallableWithRequest().call(SampleRowKeysRequest.create(TableId.of(TABLE_ID))); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.SampleRowKeys"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencyCheckAndMutateRow() throws InterruptedException { + ConditionalRowMutation mutation = + ConditionalRowMutation.create(TABLE_ID, "fake-key") + .then(Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value")); + stub.checkAndMutateRowCallable().call(mutation); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.CheckAndMutateRow"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencyReadModifyWriteRow() throws InterruptedException { + ReadModifyWriteRow request = + ReadModifyWriteRow.create(TABLE_ID, "fake-key") + .append("fake-family", "fake-qualifier", "suffix"); + stub.readModifyWriteRowCallable().call(request); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadModifyWriteRow"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFEMissingHeaderMetric() throws InterruptedException { + // Make a few calls to the server which will inject the server-timing header and the counter + // should be 0. + stub.readRowsCallable().call(Query.create(TABLE_ID)); + stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "key")); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long mutateRowMissingCount = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, + TagValue.create("Bigtable.MutateRow"), + RpcMeasureConstants.BIGTABLE_STATUS, + TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + long readRowsMissingCount = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + + assertThat(mutateRowMissingCount).isEqualTo(0); + assertThat(readRowsMissingCount).isEqualTo(0); + + // Make a few more calls to the server which won't add the header and the counter should match + // the number of requests sent. + int readRowsCalls = new Random().nextInt(10) + 1; + int mutateRowCalls = new Random().nextInt(10) + 1; + for (int i = 0; i < mutateRowCalls; i++) { + noHeaderStub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "fake-key" + i)); + } + for (int i = 0; i < readRowsCalls; i++) { + noHeaderStub.readRowsCallable().call(Query.create(TABLE_ID)); + } + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + + mutateRowMissingCount = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, + TagValue.create("Bigtable.MutateRow"), + RpcMeasureConstants.BIGTABLE_STATUS, + TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + readRowsMissingCount = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, + TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, + TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + assertThat(mutateRowMissingCount).isEqualTo(mutateRowCalls); + assertThat(readRowsMissingCount).isEqualTo(readRowsCalls); + } + + @Test + public void testMetricsWithErrorResponse() throws InterruptedException { + try { + stub.readRowsCallable().call(Query.create("random-table-id")).iterator().next(); + fail("readrows should throw exception"); + } catch (Exception e) { + assertThat(e).isInstanceOf(UnavailableException.class); + } + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long missingCount = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, + TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, + TagValue.create("UNAVAILABLE")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(missingCount).isEqualTo(attempts); + } + + private class FakeService extends BigtableImplBase { + private final String defaultTableName = + NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID); + + @Override + public void readRows(ReadRowsRequest request, StreamObserver observer) { + if (!request.getTableName().equals(defaultTableName)) { + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + return; + } + observer.onNext(ReadRowsResponse.getDefaultInstance()); + observer.onCompleted(); + } + + @Override + public void mutateRow(MutateRowRequest request, StreamObserver observer) { + observer.onNext(MutateRowResponse.getDefaultInstance()); + observer.onCompleted(); + } + + @Override + public void mutateRows(MutateRowsRequest request, StreamObserver observer) { + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(i)); + } + observer.onNext(builder.build()); + observer.onCompleted(); + } + + @Override + public void sampleRowKeys( + com.google.bigtable.v2.SampleRowKeysRequest request, + StreamObserver observer) { + observer.onNext(SampleRowKeysResponse.getDefaultInstance()); + observer.onCompleted(); + } + + @Override + public void checkAndMutateRow( + CheckAndMutateRowRequest request, StreamObserver observer) { + observer.onNext(CheckAndMutateRowResponse.getDefaultInstance()); + observer.onCompleted(); + } + + @Override + public void readModifyWriteRow( + ReadModifyWriteRowRequest request, StreamObserver observer) { + observer.onNext(ReadModifyWriteRowResponse.getDefaultInstance()); + observer.onCompleted(); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java new file mode 100644 index 0000000000..fa09ba2c11 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -0,0 +1,481 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +import com.google.api.gax.batching.Batcher; +import com.google.api.gax.batching.BatcherImpl; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ClientContext; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.ReadRowsResponse.CellChunk; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.cloud.bigtable.data.v2.stub.BigtableClientContext; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; +import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.StringValue; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import io.opencensus.stats.StatsComponent; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tags; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; + +@RunWith(JUnit4.class) +public class MetricsTracerTest { + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String APP_PROFILE_ID = "default"; + private static final String TABLE_ID = "fake-table"; + private static final long SLEEP_VARIABILITY = 15; + + private static final ReadRowsResponse DEFAULT_READ_ROWS_RESPONSES = + ReadRowsResponse.newBuilder() + .addChunks( + CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("fake-key")) + .setFamilyName(StringValue.of("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(1_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + private Server server; + + @Mock(answer = Answers.CALLS_REAL_METHODS) + private BigtableGrpc.BigtableImplBase mockService; + + private final StatsComponent localStats = new SimpleStatsComponent(); + private EnhancedBigtableStub stub; + private BigtableDataSettings settings; + + @Before + public void setUp() throws Exception { + server = FakeServiceBuilder.create(mockService).start(); + + RpcViews.registerBigtableClientViews(localStats.getViewManager()); + + settings = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setAppProfileId(APP_PROFILE_ID) + .build(); + + BigtableClientContext bigtableClientContext = + EnhancedBigtableStub.createBigtableClientContext(settings.getStubSettings()); + ClientContext clientContext = + bigtableClientContext.getClientContext().toBuilder() + .setTracerFactory( + EnhancedBigtableStub.createBigtableTracerFactory( + settings.getStubSettings(), + Tags.getTagger(), + localStats.getStatsRecorder(), + null)) + .build(); + stub = new EnhancedBigtableStub(settings.getStubSettings(), clientContext); + } + + @After + public void tearDown() { + stub.close(); + server.shutdown(); + } + + @Test + public void testReadRowsLatency() throws InterruptedException { + final long sleepTime = 50; + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + Thread.sleep(sleepTime); + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), any()); + + Stopwatch stopwatch = Stopwatch.createStarted(); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + long opLatency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_OP_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(opLatency).isIn(Range.closed(sleepTime, elapsed)); + } + + @Test + public void testReadRowsOpCount() throws InterruptedException { + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), any()); + + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + + long opLatency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(opLatency).isEqualTo(2); + } + + @Test + public void testReadRowsFirstRow() throws InterruptedException { + final long beforeSleep = 50; + final long afterSleep = 50; + + SettableFuture gotFirstRow = SettableFuture.create(); + + ExecutorService executor = Executors.newCachedThreadPool(); + doAnswer( + invocation -> { + StreamObserver observer = invocation.getArgument(1); + executor.submit( + () -> { + Thread.sleep(beforeSleep); + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + // wait until the first row is consumed before padding the operation span + gotFirstRow.get(); + Thread.sleep(afterSleep); + observer.onCompleted(); + return null; + }); + return null; + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), any()); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + // Get the first row and notify the mock that it can start padding the operation span + Iterator it = stub.readRowsCallable().call(Query.create(TABLE_ID)).iterator(); + it.next(); + gotFirstRow.set(null); + // finish the stream + while (it.hasNext()) { + it.next(); + } + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + executor.shutdown(); + + long firstRowLatency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW, + ImmutableMap.of(), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + assertThat(firstRowLatency) + .isIn( + Range.closed( + beforeSleep - SLEEP_VARIABILITY, elapsed - afterSleep + SLEEP_VARIABILITY)); + } + + @Test + public void testReadRowsAttemptsPerOp() throws InterruptedException { + final AtomicInteger callCount = new AtomicInteger(0); + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + + // First call will trigger a transient error + if (callCount.getAndIncrement() == 0) { + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + return null; + } + + // Next attempt will return a row + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), any()); + + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + + long opLatency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(opLatency).isEqualTo(2); + } + + @Test + public void testReadRowsAttemptLatency() throws InterruptedException { + final long sleepTime = 50; + final AtomicInteger callCount = new AtomicInteger(0); + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + + Thread.sleep(sleepTime); + + // First attempt will return a transient error + if (callCount.getAndIncrement() == 0) { + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + return null; + } + // Next attempt will be ok + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), any()); + + Stopwatch stopwatch = Stopwatch.createStarted(); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + long attemptLatency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + // Average attempt latency will be just a single wait (as opposed to op latency which will be 2x + // sleeptime) + assertThat(attemptLatency).isIn(Range.closed(sleepTime, elapsed - sleepTime)); + } + + @Test + public void testInvalidRequest() { + try { + stub.bulkMutateRowsCallable().call(BulkMutation.create(TABLE_ID)); + Assert.fail("Invalid request should throw exception"); + } catch (IllegalStateException e) { + // Verify that the latency is recorded with an error code (in this case UNKNOWN) + long attemptLatency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("UNKNOWN")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(attemptLatency).isAtLeast(0); + } + } + + @Test + public void testBatchReadRowsThrottledTime() throws Exception { + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), any()); + + try (Batcher batcher = + stub.newBulkReadRowsBatcher(Query.create(TABLE_ID), GrpcCallContext.createDefault())) { + batcher.add(ByteString.copyFromUtf8("row1")); + } + + long throttledTimeMetric = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW, + ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(throttledTimeMetric).isEqualTo(0); + } + + @Test + public void testBatchMutateRowsThrottledTime() throws Exception { + FlowController flowController = Mockito.mock(FlowController.class); + MutateRowsBatchingDescriptor batchingDescriptor = new MutateRowsBatchingDescriptor(); + + // Mock throttling + final long throttled = 50; + doAnswer( + invocation -> { + Thread.sleep(throttled); + return null; + }) + .when(flowController) + .reserve(any(Long.class), any(Long.class)); + when(flowController.getMaxElementCountLimit()).thenReturn(null); + when(flowController.getMaxRequestBytesLimit()).thenReturn(null); + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + MutateRowsRequest request = (MutateRowsRequest) invocation.getArguments()[0]; + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + observer.onNext(builder.build()); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .mutateRows(any(MutateRowsRequest.class), any()); + + ApiCallContext defaultContext = GrpcCallContext.createDefault(); + + try (Batcher batcher = + new BatcherImpl<>( + batchingDescriptor, + stub.internalBulkMutateRowsCallable().withDefaultCallContext(defaultContext), + BulkMutation.create(TABLE_ID), + settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(), + Executors.newSingleThreadScheduledExecutor(), + flowController, + defaultContext)) { + + batcher.add(RowMutationEntry.create("key").deleteRow()); + } + + long throttledTimeMetric = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(throttledTimeMetric).isAtLeast(throttled); + } + + @SuppressWarnings("unchecked") + private static StreamObserver anyObserver(Class returnType) { + return (StreamObserver) any(returnType); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/SimpleStatsComponent.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/SimpleStatsComponent.java new file mode 100644 index 0000000000..99aed9c3b4 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/SimpleStatsComponent.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import io.opencensus.implcore.common.MillisClock; +import io.opencensus.implcore.internal.SimpleEventQueue; +import io.opencensus.implcore.stats.StatsComponentImplBase; + +/** A StatsComponent implementation for testing that executes all events inline. */ +public class SimpleStatsComponent extends StatsComponentImplBase { + public SimpleStatsComponent() { + super(new SimpleEventQueue(), MillisClock.getInstance()); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java new file mode 100644 index 0000000000..e808af8a84 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java @@ -0,0 +1,351 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; +import io.grpc.Context; +import io.opencensus.common.Scope; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.Measure; +import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.StatsComponent; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.Tag; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagMetadata; +import io.opencensus.tags.TagMetadata.TagTtl; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.unsafe.ContextUtils; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +class StatsTestUtils { + private StatsTestUtils() {} + + public static class MetricsRecord { + public final ImmutableMap tags; + public final ImmutableMap metrics; + + private MetricsRecord( + ImmutableMap tags, ImmutableMap metrics) { + this.tags = tags; + this.metrics = metrics; + } + + /** Returns the value of a metric, or {@code null} if not found. */ + @Nullable + public Double getMetric(Measure measure) { + for (Map.Entry m : metrics.entrySet()) { + if (m.getKey().equals(measure)) { + Number value = m.getValue(); + if (value instanceof Double) { + return (Double) value; + } else if (value instanceof Long) { + return (double) (Long) value; + } + throw new AssertionError("Unexpected measure value type: " + value.getClass().getName()); + } + } + return null; + } + + /** Returns the value of a metric converted to long, or throw if not found. */ + public long getMetricAsLongOrFail(Measure measure) { + Double doubleValue = getMetric(measure); + checkNotNull(doubleValue, "Measure not found: %s", measure.getName()); + long longValue = (long) (Math.abs(doubleValue) + 0.0001); + if (doubleValue < 0) { + longValue = -longValue; + } + return longValue; + } + } + + /** + * A {@link Tagger} implementation that saves metrics records to be accessible from {@link + * #pollRecord()} and {@link #pollRecord(long, TimeUnit)}, until {@link #rolloverRecords} is + * called. + */ + public static final class FakeStatsRecorder extends StatsRecorder { + + private BlockingQueue records; + + public FakeStatsRecorder() { + rolloverRecords(); + } + + @Override + public MeasureMap newMeasureMap() { + return new FakeStatsRecord(this); + } + + public MetricsRecord pollRecord() { + return getCurrentRecordSink().poll(); + } + + public MetricsRecord pollRecord(long timeout, TimeUnit unit) throws InterruptedException { + return getCurrentRecordSink().poll(timeout, unit); + } + + /** + * Disconnect this tagger with the contexts it has created so far. The records from those + * contexts will not show up in {@link #pollRecord}. Useful for isolating the records between + * test cases. + */ + // This needs to be synchronized with getCurrentRecordSink() which may run concurrently. + public synchronized void rolloverRecords() { + records = new LinkedBlockingQueue<>(); + } + + private synchronized BlockingQueue getCurrentRecordSink() { + return records; + } + } + + public static final class FakeTagger extends Tagger { + + @Override + public FakeTagContext empty() { + return FakeTagContext.EMPTY; + } + + @Override + public TagContext getCurrentTagContext() { + return ContextUtils.getValue(Context.current()); + } + + @Override + public TagContextBuilder emptyBuilder() { + return new FakeTagContextBuilder(ImmutableMap.of()); + } + + @Override + public FakeTagContextBuilder toBuilder(TagContext tags) { + return new FakeTagContextBuilder(getTags(tags)); + } + + @Override + public TagContextBuilder currentBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + public Scope withTagContext(TagContext tags) { + throw new UnsupportedOperationException(); + } + } + + public static final class FakeStatsRecord extends MeasureMap { + private final BlockingQueue recordSink; + public final Map metrics = Maps.newHashMap(); + + private FakeStatsRecord(FakeStatsRecorder statsRecorder) { + this.recordSink = statsRecorder.getCurrentRecordSink(); + } + + @Override + public MeasureMap put(Measure.MeasureDouble measure, double value) { + metrics.put(measure, value); + return this; + } + + @Override + public MeasureMap put(Measure.MeasureLong measure, long value) { + metrics.put(measure, value); + return this; + } + + @Override + public void record(TagContext tags) { + recordSink.add(new MetricsRecord(getTags(tags), ImmutableMap.copyOf(metrics))); + } + + @Override + public void record() { + throw new UnsupportedOperationException(); + } + } + + public static final class FakeTagContext extends TagContext { + private static final FakeTagContext EMPTY = + new FakeTagContext(ImmutableMap.of()); + + private static final TagMetadata METADATA_PROPAGATING = + TagMetadata.create(TagTtl.UNLIMITED_PROPAGATION); + + private final ImmutableMap tags; + + private FakeTagContext(ImmutableMap tags) { + this.tags = tags; + } + + public ImmutableMap getTags() { + return tags; + } + + @Override + public String toString() { + return "[tags=" + tags + "]"; + } + + @Override + protected Iterator getIterator() { + return Iterators.transform( + tags.entrySet().iterator(), + new Function, Tag>() { + @Override + public Tag apply(@Nullable Map.Entry entry) { + return Tag.create(entry.getKey(), entry.getValue(), METADATA_PROPAGATING); + } + }); + } + } + + public static class FakeTagContextBuilder extends TagContextBuilder { + + private final Map tagsBuilder = Maps.newHashMap(); + + private FakeTagContextBuilder(Map tags) { + tagsBuilder.putAll(tags); + } + + @SuppressWarnings("deprecation") + @Override + public TagContextBuilder put(TagKey key, TagValue value) { + tagsBuilder.put(key, value); + return this; + } + + @Override + public TagContextBuilder remove(TagKey key) { + tagsBuilder.remove(key); + return this; + } + + @Override + public TagContext build() { + FakeTagContext context = new FakeTagContext(ImmutableMap.copyOf(tagsBuilder)); + return context; + } + + @Override + public Scope buildScoped() { + throw new UnsupportedOperationException(); + } + } + + // This method handles the default TagContext, which isn't an instance of FakeTagContext. + private static ImmutableMap getTags(TagContext tags) { + return tags instanceof FakeTagContext + ? ((FakeTagContext) tags).getTags() + : ImmutableMap.of(); + } + + public static long getAggregationValueAsLong( + StatsComponent stats, + View view, + ImmutableMap tags, + String projectId, + String instanceId, + String appProfileId) { + ViewData viewData = stats.getViewManager().getView(view.getName()); + Map, AggregationData> aggregationMap = + Objects.requireNonNull(viewData).getAggregationMap(); + + List tagValues = new ArrayList<>(); + + for (TagKey column : view.getColumns()) { + if (RpcMeasureConstants.BIGTABLE_PROJECT_ID == column) { + tagValues.add(TagValue.create(projectId)); + } else if (RpcMeasureConstants.BIGTABLE_INSTANCE_ID == column) { + tagValues.add(TagValue.create(instanceId)); + } else if (RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID == column) { + tagValues.add(TagValue.create(appProfileId)); + } else { + tagValues.add(tags.get(column)); + } + } + + AggregationData aggregationData = aggregationMap.get(tagValues); + + if (aggregationData == null) { + throw new RuntimeException( + "Failed to find metric for: " + tags + ". Current aggregation data: " + aggregationMap); + } + + return aggregationData.match( + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.SumDataDouble arg) { + return (long) arg.getSum(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.SumDataLong arg) { + return arg.getSum(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.CountData arg) { + return arg.getCount(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.DistributionData arg) { + return (long) arg.getMean(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.LastValueDataDouble arg) { + return (long) arg.getLastValue(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.LastValueDataLong arg) { + return arg.getLastValue(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData arg) { + throw new UnsupportedOperationException(); + } + }); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java new file mode 100644 index 0000000000..3c0fb4e617 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/UtilTest.java @@ -0,0 +1,66 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.DeadlineExceededException; +import com.google.common.util.concurrent.Futures; +import io.grpc.Status; +import io.opencensus.tags.TagValue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class UtilTest { + @Test + public void testOk() { + TagValue tagValue = TagValue.create(Util.extractStatus((Throwable) null)); + assertThat(tagValue.asString()).isEqualTo("OK"); + } + + @Test + public void testOkFuture() { + TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateFuture(null)); + assertThat(tagValue.asString()).isEqualTo("OK"); + } + + @Test + public void testError() { + DeadlineExceededException error = + new DeadlineExceededException( + "Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true); + TagValue tagValue = TagValue.create(Util.extractStatus(error)); + assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED"); + } + + @Test + public void testErrorFuture() { + DeadlineExceededException error = + new DeadlineExceededException( + "Deadline exceeded", null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true); + TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateFailedFuture(error)); + assertThat(tagValue.asString()).isEqualTo("DEADLINE_EXCEEDED"); + } + + @Test + public void testCancelledFuture() { + TagValue tagValue = Util.extractStatusFromFuture(Futures.immediateCancelledFuture()); + assertThat(tagValue.asString()).isEqualTo("CANCELLED"); + } +}