diff --git a/README.md b/README.md index 645d26193c..8985acbec5 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.37.0') +implementation platform('com.google.cloud:libraries-bom:26.38.0') implementation 'com.google.cloud:google-cloud-bigtable' ``` diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java index 9c88aa62a2..f6a2527302 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java @@ -42,6 +42,7 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.monitoring.v3.CreateTimeSeriesRequest; import com.google.monitoring.v3.ProjectName; @@ -53,6 +54,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -85,6 +87,10 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter { private static final String APPLICATION_RESOURCE_PROJECT_ID = "project_id"; + // This the quota limit from Cloud Monitoring. More details in + // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas. + private static final int EXPORT_BATCH_SIZE_LIMIT = 200; + private final MetricServiceClient client; private final String bigtableProjectId; @@ -216,19 +222,12 @@ private CompletableResultCode exportBigtableResourceMetrics(Collection future = - this.client.createServiceTimeSeriesCallable().futureCall(bigtableRequest); + ApiFuture> future = exportTimeSeries(projectName, bigtableTimeSeries); CompletableResultCode bigtableExportCode = new CompletableResultCode(); ApiFutures.addCallback( future, - new ApiFutureCallback() { + new ApiFutureCallback>() { @Override public void onFailure(Throwable throwable) { if (bigtableExportFailureLogged.compareAndSet(false, true)) { @@ -245,7 +244,7 @@ public void onFailure(Throwable throwable) { } @Override - public void onSuccess(Empty empty) { + public void onSuccess(List emptyList) { // When an export succeeded reset the export failure flag to false so if there's a // transient failure it'll be logged. bigtableExportFailureLogged.set(false); @@ -290,22 +289,17 @@ private CompletableResultCode exportApplicationResourceMetrics( // Construct the request. The project id will be the project id of the detected monitored // resource. - ApiFuture gceOrGkeFuture; + ApiFuture> gceOrGkeFuture; CompletableResultCode exportCode = new CompletableResultCode(); try { ProjectName projectName = ProjectName.of(applicationResource.getLabelsOrThrow(APPLICATION_RESOURCE_PROJECT_ID)); - CreateTimeSeriesRequest request = - CreateTimeSeriesRequest.newBuilder() - .setName(projectName.toString()) - .addAllTimeSeries(timeSeries) - .build(); - gceOrGkeFuture = this.client.createServiceTimeSeriesCallable().futureCall(request); + gceOrGkeFuture = exportTimeSeries(projectName, timeSeries); ApiFutures.addCallback( gceOrGkeFuture, - new ApiFutureCallback() { + new ApiFutureCallback>() { @Override public void onFailure(Throwable throwable) { if (applicationExportFailureLogged.compareAndSet(false, true)) { @@ -322,7 +316,7 @@ public void onFailure(Throwable throwable) { } @Override - public void onSuccess(Empty empty) { + public void onSuccess(List emptyList) { // When an export succeeded reset the export failure flag to false so if there's a // transient failure it'll be logged. applicationExportFailureLogged.set(false); @@ -341,6 +335,23 @@ public void onSuccess(Empty empty) { return exportCode; } + private ApiFuture> exportTimeSeries( + ProjectName projectName, List timeSeries) { + List> batchResults = new ArrayList<>(); + + for (List batch : Iterables.partition(timeSeries, EXPORT_BATCH_SIZE_LIMIT)) { + CreateTimeSeriesRequest req = + CreateTimeSeriesRequest.newBuilder() + .setName(projectName.toString()) + .addAllTimeSeries(batch) + .build(); + ApiFuture f = this.client.createServiceTimeSeriesCallable().futureCall(req); + batchResults.add(f); + } + + return ApiFutures.allAsList(batchResults); + } + @Override public CompletableResultCode flush() { if (lastExportCode != null) { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporterTest.java index a0b9c058dc..81629e2d9d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporterTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporterTest.java @@ -50,7 +50,9 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -220,6 +222,80 @@ public void testExportingHistogramData() { assertThat(timeSeries.getPoints(0).getInterval().getEndTime().getNanos()).isEqualTo(endEpoch); } + @Test + public void testExportingSumDataInBatches() { + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(CreateTimeSeriesRequest.class); + + UnaryCallable mockCallable = mock(UnaryCallable.class); + when(mockMetricServiceStub.createServiceTimeSeriesCallable()).thenReturn(mockCallable); + ApiFuture future = ApiFutures.immediateFuture(Empty.getDefaultInstance()); + when(mockCallable.futureCall(argumentCaptor.capture())).thenReturn(future); + + long startEpoch = 10; + long endEpoch = 15; + + Collection toExport = new ArrayList<>(); + for (int i = 0; i < 250; i++) { + Attributes testAttributes = + Attributes.builder() + .put(BIGTABLE_PROJECT_ID_KEY, projectId) + .put(INSTANCE_ID_KEY, instanceId) + .put(TABLE_ID_KEY, tableId + i) + .put(CLUSTER_ID_KEY, cluster) + .put(ZONE_ID_KEY, zone) + .put(APP_PROFILE_KEY, appProfileId) + .build(); + LongPointData longPointData = + ImmutableLongPointData.create(startEpoch, endEpoch, testAttributes, i); + + MetricData longData = + ImmutableMetricData.createLongSum( + resource, + scope, + "bigtable.googleapis.com/internal/client/retry_count", + "description", + "1", + ImmutableSumData.create( + true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData))); + toExport.add(longData); + } + + exporter.export(toExport); + + assertThat(argumentCaptor.getAllValues()).hasSize(2); + CreateTimeSeriesRequest firstRequest = argumentCaptor.getAllValues().get(0); + CreateTimeSeriesRequest secondRequest = argumentCaptor.getAllValues().get(1); + + assertThat(firstRequest.getTimeSeriesList()).hasSize(200); + assertThat(secondRequest.getTimeSeriesList()).hasSize(50); + + for (int i = 0; i < 250; i++) { + TimeSeries timeSeries; + if (i < 200) { + timeSeries = firstRequest.getTimeSeriesList().get(i); + } else { + timeSeries = secondRequest.getTimeSeriesList().get(i - 200); + } + + assertThat(timeSeries.getResource().getLabelsMap()) + .containsExactly( + BIGTABLE_PROJECT_ID_KEY.getKey(), projectId, + INSTANCE_ID_KEY.getKey(), instanceId, + TABLE_ID_KEY.getKey(), tableId + i, + CLUSTER_ID_KEY.getKey(), cluster, + ZONE_ID_KEY.getKey(), zone); + + assertThat(timeSeries.getMetric().getLabelsMap()).hasSize(2); + assertThat(timeSeries.getMetric().getLabelsMap()) + .containsAtLeast(APP_PROFILE_KEY.getKey(), appProfileId, CLIENT_UID_KEY.getKey(), taskId); + assertThat(timeSeries.getPoints(0).getValue().getInt64Value()).isEqualTo(i); + assertThat(timeSeries.getPoints(0).getInterval().getStartTime().getNanos()) + .isEqualTo(startEpoch); + assertThat(timeSeries.getPoints(0).getInterval().getEndTime().getNanos()).isEqualTo(endEpoch); + } + } + @Test public void testTimeSeriesForMetricWithGceOrGkeResource() { String gceProjectId = "fake-gce-project";