Skip to content
This repository was archived by the owner on May 8, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.37.0')
implementation platform('com.google.cloud:libraries-bom:26.38.0')

implementation 'com.google.cloud:google-cloud-bigtable'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.monitoring.v3.CreateTimeSeriesRequest;
import com.google.monitoring.v3.ProjectName;
Expand All @@ -53,6 +54,7 @@
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -85,6 +87,10 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {

private static final String APPLICATION_RESOURCE_PROJECT_ID = "project_id";

// This the quota limit from Cloud Monitoring. More details in
// https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
private static final int EXPORT_BATCH_SIZE_LIMIT = 200;
Comment thread
rkaregar marked this conversation as resolved.

private final MetricServiceClient client;

private final String bigtableProjectId;
Expand Down Expand Up @@ -216,19 +222,12 @@ private CompletableResultCode exportBigtableResourceMetrics(Collection<MetricDat
}

ProjectName projectName = ProjectName.of(bigtableProjectId);
CreateTimeSeriesRequest bigtableRequest =
CreateTimeSeriesRequest.newBuilder()
.setName(projectName.toString())
.addAllTimeSeries(bigtableTimeSeries)
.build();

ApiFuture<Empty> future =
this.client.createServiceTimeSeriesCallable().futureCall(bigtableRequest);
ApiFuture<List<Empty>> future = exportTimeSeries(projectName, bigtableTimeSeries);

CompletableResultCode bigtableExportCode = new CompletableResultCode();
ApiFutures.addCallback(
future,
new ApiFutureCallback<Empty>() {
new ApiFutureCallback<List<Empty>>() {
Comment thread
rkaregar marked this conversation as resolved.
@Override
public void onFailure(Throwable throwable) {
if (bigtableExportFailureLogged.compareAndSet(false, true)) {
Expand All @@ -245,7 +244,7 @@ public void onFailure(Throwable throwable) {
}

@Override
public void onSuccess(Empty empty) {
public void onSuccess(List<Empty> emptyList) {
Comment thread
rkaregar marked this conversation as resolved.
// When an export succeeded reset the export failure flag to false so if there's a
// transient failure it'll be logged.
bigtableExportFailureLogged.set(false);
Expand Down Expand Up @@ -290,22 +289,17 @@ private CompletableResultCode exportApplicationResourceMetrics(

// Construct the request. The project id will be the project id of the detected monitored
// resource.
ApiFuture<Empty> gceOrGkeFuture;
ApiFuture<List<Empty>> gceOrGkeFuture;
Comment thread
rkaregar marked this conversation as resolved.
CompletableResultCode exportCode = new CompletableResultCode();
try {
ProjectName projectName =
ProjectName.of(applicationResource.getLabelsOrThrow(APPLICATION_RESOURCE_PROJECT_ID));
CreateTimeSeriesRequest request =
CreateTimeSeriesRequest.newBuilder()
.setName(projectName.toString())
.addAllTimeSeries(timeSeries)
.build();

gceOrGkeFuture = this.client.createServiceTimeSeriesCallable().futureCall(request);
gceOrGkeFuture = exportTimeSeries(projectName, timeSeries);

ApiFutures.addCallback(
gceOrGkeFuture,
new ApiFutureCallback<Empty>() {
new ApiFutureCallback<List<Empty>>() {
@Override
public void onFailure(Throwable throwable) {
if (applicationExportFailureLogged.compareAndSet(false, true)) {
Expand All @@ -322,7 +316,7 @@ public void onFailure(Throwable throwable) {
}

@Override
public void onSuccess(Empty empty) {
public void onSuccess(List<Empty> emptyList) {
// When an export succeeded reset the export failure flag to false so if there's a
// transient failure it'll be logged.
applicationExportFailureLogged.set(false);
Expand All @@ -341,6 +335,23 @@ public void onSuccess(Empty empty) {
return exportCode;
}

private ApiFuture<List<Empty>> exportTimeSeries(
ProjectName projectName, List<TimeSeries> timeSeries) {
List<ApiFuture<Empty>> batchResults = new ArrayList<>();

for (List<TimeSeries> batch : Iterables.partition(timeSeries, EXPORT_BATCH_SIZE_LIMIT)) {
CreateTimeSeriesRequest req =
CreateTimeSeriesRequest.newBuilder()
.setName(projectName.toString())
.addAllTimeSeries(batch)
.build();
ApiFuture<Empty> f = this.client.createServiceTimeSeriesCallable().futureCall(req);
batchResults.add(f);
}

return ApiFutures.allAsList(batchResults);
}

@Override
public CompletableResultCode flush() {
if (lastExportCode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -220,6 +222,80 @@ public void testExportingHistogramData() {
assertThat(timeSeries.getPoints(0).getInterval().getEndTime().getNanos()).isEqualTo(endEpoch);
}

@Test
public void testExportingSumDataInBatches() {
ArgumentCaptor<CreateTimeSeriesRequest> argumentCaptor =
ArgumentCaptor.forClass(CreateTimeSeriesRequest.class);

UnaryCallable<CreateTimeSeriesRequest, Empty> mockCallable = mock(UnaryCallable.class);
when(mockMetricServiceStub.createServiceTimeSeriesCallable()).thenReturn(mockCallable);
ApiFuture<Empty> future = ApiFutures.immediateFuture(Empty.getDefaultInstance());
when(mockCallable.futureCall(argumentCaptor.capture())).thenReturn(future);

long startEpoch = 10;
long endEpoch = 15;

Collection<MetricData> toExport = new ArrayList<>();
for (int i = 0; i < 250; i++) {
Attributes testAttributes =
Attributes.builder()
.put(BIGTABLE_PROJECT_ID_KEY, projectId)
.put(INSTANCE_ID_KEY, instanceId)
.put(TABLE_ID_KEY, tableId + i)
.put(CLUSTER_ID_KEY, cluster)
.put(ZONE_ID_KEY, zone)
.put(APP_PROFILE_KEY, appProfileId)
.build();
LongPointData longPointData =
ImmutableLongPointData.create(startEpoch, endEpoch, testAttributes, i);

MetricData longData =
ImmutableMetricData.createLongSum(
resource,
scope,
"bigtable.googleapis.com/internal/client/retry_count",
"description",
"1",
ImmutableSumData.create(
true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData)));
toExport.add(longData);
}

exporter.export(toExport);

assertThat(argumentCaptor.getAllValues()).hasSize(2);
CreateTimeSeriesRequest firstRequest = argumentCaptor.getAllValues().get(0);
CreateTimeSeriesRequest secondRequest = argumentCaptor.getAllValues().get(1);

assertThat(firstRequest.getTimeSeriesList()).hasSize(200);
assertThat(secondRequest.getTimeSeriesList()).hasSize(50);

for (int i = 0; i < 250; i++) {
TimeSeries timeSeries;
if (i < 200) {
timeSeries = firstRequest.getTimeSeriesList().get(i);
} else {
timeSeries = secondRequest.getTimeSeriesList().get(i - 200);
}

assertThat(timeSeries.getResource().getLabelsMap())
.containsExactly(
BIGTABLE_PROJECT_ID_KEY.getKey(), projectId,
INSTANCE_ID_KEY.getKey(), instanceId,
TABLE_ID_KEY.getKey(), tableId + i,
CLUSTER_ID_KEY.getKey(), cluster,
ZONE_ID_KEY.getKey(), zone);

assertThat(timeSeries.getMetric().getLabelsMap()).hasSize(2);
assertThat(timeSeries.getMetric().getLabelsMap())
.containsAtLeast(APP_PROFILE_KEY.getKey(), appProfileId, CLIENT_UID_KEY.getKey(), taskId);
assertThat(timeSeries.getPoints(0).getValue().getInt64Value()).isEqualTo(i);
assertThat(timeSeries.getPoints(0).getInterval().getStartTime().getNanos())
.isEqualTo(startEpoch);
assertThat(timeSeries.getPoints(0).getInterval().getEndTime().getNanos()).isEqualTo(endEpoch);
}
}

@Test
public void testTimeSeriesForMetricWithGceOrGkeResource() {
String gceProjectId = "fake-gce-project";
Expand Down