From ed5cd19911ed89b41981afc5f95249b919a5941a Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Sun, 20 Apr 2025 01:04:46 +0100 Subject: [PATCH 01/17] Adding new MetricSet (cherry picked from commit 357368ce7d4275449321f64fc0817377956b68e4) --- .../apache/storm/kafka/spout/KafkaSpout.java | 2 +- .../metrics2/KafkaOffsetMetricManager.java | 7 + .../KafkaOffsetPartitionAndTopicMetrics.java | 211 ++++++++++++++++++ .../metrics2/KafkaOffsetPartitionMetrics.java | 2 +- 4 files changed, 220 insertions(+), 2 deletions(-) create mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index a328aec5d70..4a7dee0f498 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -634,7 +634,7 @@ private void refreshAssignment() { boolean partitionChanged = topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener); if (partitionChanged && canRegisterMetrics()) { LOG.info("Partitions assignments has changed, updating metrics."); - kafkaOffsetMetricManager.registerMetricsForNewTopicPartitions(assignedPartitions); + kafkaOffsetMetricManager.registerPartitionAndTopicLevelMetrics(assignedPartitions); } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java index 2b150797ffe..3a3a67a3a55 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java @@ -75,6 +75,13 @@ public void registerMetricsForNewTopicPartitions(Set newAssignme } } + public void registerPartitionAndTopicLevelMetrics(Set newAssignment) { + + KafkaOffsetPartitionAndTopicMetrics topicPartitionMetricSet + = new KafkaOffsetPartitionAndTopicMetrics<>(offsetManagerSupplier, adminSupplier,newAssignment); + topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet); + } + public Map getTopicPartitionMetricsMap() { return topicPartitionMetricsMap; } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java new file mode 100644 index 00000000000..4516ad957ad --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -0,0 +1,211 @@ +package org.apache.storm.kafka.spout.metrics2; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +/** + * This class is used compute the partition and topic level offset metrics. + *

+ * Partition level metrics are: + * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition + * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition + * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout + * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout + * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset + * topicName/partition_{number}/recordsInPartition // total number of records in the partition + *

+ *

+ * Topic level metrics are: + * topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout + * topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout + * topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout + * topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout + * topicName/spoutLag // total spout lag of all the associated partitions of this spout + * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout + *

+ */ +public class KafkaOffsetPartitionAndTopicMetrics implements MetricSet { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetPartitionAndTopicMetrics.class); + private final Supplier> offsetManagerSupplier; + private final Supplier adminSupplier; + private final Set assignment; + private Map topicMetricsMap; + + + public KafkaOffsetPartitionAndTopicMetrics(Supplier> offsetManagerSupplier, Supplier adminSupplier, Set assignment) { + this.offsetManagerSupplier = offsetManagerSupplier; + this.adminSupplier = adminSupplier; + this.assignment = assignment; + } + + @Override + public Map getMetrics() { + + Map metrics = new HashMap<>(); + + for (TopicPartition topicPartition : assignment) { + + String topic=topicPartition.topic(); + KafkaOffsetTopicMetrics topicMetrics = topicMetricsMap.get(topic); + if (topicMetrics == null) { + topicMetrics = new KafkaOffsetTopicMetrics(topic); + topicMetricsMap.put(topic, topicMetrics); + } + + String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); + KafkaOffsetTopicMetrics finalTopicMetrics = topicMetrics; + Gauge spoutLagGauge = () -> { + Map endOffsets = getEndOffsets(Collections.singleton(topicPartition)); + if (endOffsets == null || endOffsets.isEmpty()) { + LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); + return 0L; + } + // add value to topic level metric + OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); + Long ret = endOffsets.get(topicPartition) - offsetManager.getCommittedOffset(); + finalTopicMetrics.totalSpoutLag += ret; + return ret; + }; + + Gauge earliestTimeOffsetGauge = () -> { + Map beginningOffsets = getBeginningOffsets(Collections.singleton(topicPartition)); + if (beginningOffsets == null || beginningOffsets.isEmpty()) { + LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", topicPartition); + return 0L; + } + // add value to topic level metric + Long ret = beginningOffsets.get(topicPartition); + finalTopicMetrics.totalEarliestTimeOffset += ret; + return ret; + }; + + Gauge latestTimeOffsetGauge = () -> { + Map endOffsets = getEndOffsets(Collections.singleton(topicPartition)); + if (endOffsets == null || endOffsets.isEmpty()) { + LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); + return 0L; + } + // add value to topic level metric + Long ret = endOffsets.get(topicPartition); + finalTopicMetrics.totalLatestTimeOffset += ret; + return ret; + }; + + Gauge latestEmittedOffsetGauge = () -> { + // add value to topic level metric + OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); + Long ret = offsetManager.getLatestEmittedOffset(); + finalTopicMetrics.totalLatestEmittedOffset+=ret; + return ret; + }; + + Gauge latestCompletedOffsetGauge = () -> { + // add value to topic level metric + OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); + Long ret = offsetManager.getCommittedOffset(); + finalTopicMetrics.totalLatestCompletedOffset+=ret; + return ret; + }; + + Gauge recordsInPartitionGauge = () -> { + Map endOffsets = getEndOffsets(Collections.singleton(topicPartition)); + if (endOffsets == null || endOffsets.isEmpty()) { + LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); + return 0L; + } + Map beginningOffsets = getBeginningOffsets(Collections.singleton(topicPartition)); + if (beginningOffsets == null || beginningOffsets.isEmpty()) { + LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", topicPartition); + return 0L; + } + // add value to topic level metric + Long ret = endOffsets.get(topicPartition) - beginningOffsets.get(topicPartition); + finalTopicMetrics.totalRecordsInPartitions+=ret; + return ret; + }; + + metrics.put(metricPath + "/" + "spoutLag", spoutLagGauge); + metrics.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffsetGauge); + metrics.put(metricPath + "/" + "latestTimeOffset", latestTimeOffsetGauge); + metrics.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffsetGauge); + metrics.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffsetGauge); + metrics.put(metricPath + "/" + "recordsInPartition", recordsInPartitionGauge); + + } + + metrics.putAll(topicMetricsMap); + + return metrics; + + } + + private Map getBeginningOffsets(Set topicPartitions) { + Admin admin = adminSupplier.get(); + if (admin == null) { + LOG.error("Kafka admin object is null, returning 0."); + return Collections.EMPTY_MAP; + } + + Map beginningOffsets; + try { + beginningOffsets = getOffsets(admin, topicPartitions, OffsetSpec.earliest()); + } catch (RetriableException | ExecutionException | InterruptedException e) { + LOG.error("Failed to get offset from Kafka for topic partitions: {}.", topicPartitions, e); + return Collections.EMPTY_MAP; + } + return beginningOffsets; + } + + private Map getEndOffsets(Set topicPartitions) { + Admin admin = adminSupplier.get(); + if (admin == null) { + LOG.error("Kafka admin object is null, returning 0."); + return Collections.EMPTY_MAP; + } + + Map endOffsets; + try { + endOffsets = getOffsets(admin, topicPartitions, OffsetSpec.latest()); + } catch (RetriableException | ExecutionException | InterruptedException e) { + LOG.error("Failed to get offset from Kafka for topic partitions: {}.", topicPartitions, e); + return Collections.EMPTY_MAP; + } + return endOffsets; + } + + private static Map getOffsets(Admin admin, Set topicPartitions, OffsetSpec offsetSpec) + throws InterruptedException, ExecutionException { + + Map offsetSpecMap = new HashMap<>(); + for (TopicPartition topicPartition : topicPartitions) { + offsetSpecMap.put(topicPartition, offsetSpec); + } + Map ret = new HashMap<>(); + ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap); + KafkaFuture> all = listOffsetsResult.all(); + Map topicPartitionListOffsetsResultInfoMap = all.get(); + for (Map.Entry entry : + topicPartitionListOffsetsResultInfoMap.entrySet()) { + ret.put(entry.getKey(), entry.getValue().offset()); + } + return ret; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java index 6b29eefc919..0bb0d2c2168 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java @@ -101,7 +101,7 @@ public Long getValue() { } // add value to topic level metric Long ret = beginningOffsets.get(topicPartition); - topicMetrics.totalEarliestTimeOffset += beginningOffsets.get(topicPartition); + topicMetrics.totalEarliestTimeOffset += ret; return ret; } }; From 4d70aec43253c9ad0ee0c12346d2a9d72513b4ec Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Sun, 20 Apr 2025 19:38:11 +0100 Subject: [PATCH 02/17] Adding unit tests (cherry picked from commit 7403cebd4cd66dd7d3ade78027734dffff436696) --- .../apache/storm/kafka/spout/KafkaSpout.java | 1 + .../metrics2/KafkaOffsetMetricManager.java | 23 +--- .../KafkaOffsetPartitionAndTopicMetrics.java | 9 +- ...fkaOffsetPartitionAndTopicMetricsTest.java | 106 ++++++++++++++++++ 4 files changed, 114 insertions(+), 25 deletions(-) create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 4a7dee0f498..2b35fdd9272 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -57,6 +57,7 @@ import org.apache.storm.kafka.spout.internal.OffsetManager; import org.apache.storm.kafka.spout.internal.Timer; import org.apache.storm.kafka.spout.metrics2.KafkaOffsetMetricManager; +import org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionAndTopicMetrics; import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java index 3a3a67a3a55..c78b5637e89 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java @@ -54,31 +54,10 @@ public KafkaOffsetMetricManager(Supplier> off LOG.info("Running KafkaOffsetMetricManager"); } - public void registerMetricsForNewTopicPartitions(Set newAssignment) { - for (TopicPartition topicPartition : newAssignment) { - if (!topicPartitionMetricsMap.containsKey(topicPartition)) { - LOG.info("Registering metric for topicPartition: {}", topicPartition); - // create topic level metrics for given topic if absent - String topic = topicPartition.topic(); - KafkaOffsetTopicMetrics topicMetrics = topicMetricsMap.get(topic); - if (topicMetrics == null) { - topicMetrics = new KafkaOffsetTopicMetrics(topic); - topicMetricsMap.put(topic, topicMetrics); - topologyContext.registerMetricSet("kafkaOffset", topicMetrics); - } - - KafkaOffsetPartitionMetrics topicPartitionMetricSet - = new KafkaOffsetPartitionMetrics<>(offsetManagerSupplier, adminSupplier, topicPartition, topicMetrics); - topicPartitionMetricsMap.put(topicPartition, topicPartitionMetricSet); - topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet); - } - } - } - public void registerPartitionAndTopicLevelMetrics(Set newAssignment) { KafkaOffsetPartitionAndTopicMetrics topicPartitionMetricSet - = new KafkaOffsetPartitionAndTopicMetrics<>(offsetManagerSupplier, adminSupplier,newAssignment); + = new KafkaOffsetPartitionAndTopicMetrics(offsetManagerSupplier, adminSupplier,newAssignment); topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet); } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index 4516ad957ad..973ce99f313 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -41,19 +41,20 @@ * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout *

*/ -public class KafkaOffsetPartitionAndTopicMetrics implements MetricSet { +public class KafkaOffsetPartitionAndTopicMetrics implements MetricSet { private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetPartitionAndTopicMetrics.class); private final Supplier> offsetManagerSupplier; private final Supplier adminSupplier; private final Set assignment; - private Map topicMetricsMap; + private final Map topicMetricsMap; public KafkaOffsetPartitionAndTopicMetrics(Supplier> offsetManagerSupplier, Supplier adminSupplier, Set assignment) { this.offsetManagerSupplier = offsetManagerSupplier; this.adminSupplier = adminSupplier; this.assignment = assignment; + this.topicMetricsMap=new HashMap<>(); } @Override @@ -151,7 +152,9 @@ public Map getMetrics() { } - metrics.putAll(topicMetricsMap); + for( KafkaOffsetTopicMetrics kafkaOffsetTopicMetrics : topicMetricsMap.values()){ + metrics.putAll(kafkaOffsetTopicMetrics.getMetrics()); + } return metrics; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java new file mode 100644 index 00000000000..0475155a993 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java @@ -0,0 +1,106 @@ +package org.apache.storm.kafka.spout.metric2; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionAndTopicMetrics; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.*; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class KafkaOffsetPartitionAndTopicMetricsTest { + + private Set assignment; + private Admin admin; + private HashMap offsetManagers; + private ListOffsetsResult listOffsetsResult; + private KafkaFuture> kafkaFuture; + private Map topicPartitionListOffsetsResultInfoMap; + + @Test + public void registerPartitionAndTopicMetrics() throws ExecutionException, InterruptedException { + + TopicPartition tAp1 = new TopicPartition("topicA",1); + TopicPartition tAp2 = new TopicPartition("topicA",2); + TopicPartition tBp1 = new TopicPartition("topicB",1); + TopicPartition tBp2 = new TopicPartition("topicB",2); + + ListOffsetsResult.ListOffsetsResultInfo tAp1ListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tAp2ListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(200,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp1ListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp2ListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400,System.currentTimeMillis(),Optional.empty()); + + topicPartitionListOffsetsResultInfoMap = new HashMap<>(); + + topicPartitionListOffsetsResultInfoMap.put(tAp1,tAp1ListOffsetsResultInfo); + topicPartitionListOffsetsResultInfoMap.put(tAp2,tAp2ListOffsetsResultInfo); + topicPartitionListOffsetsResultInfoMap.put(tBp1,tBp1ListOffsetsResultInfo); + topicPartitionListOffsetsResultInfoMap.put(tBp2,tBp2ListOffsetsResultInfo); + + kafkaFuture = mock(KafkaFuture.class); + when(kafkaFuture.get()).thenReturn(topicPartitionListOffsetsResultInfoMap); + + listOffsetsResult = mock(ListOffsetsResult.class); + when(listOffsetsResult.all()).thenReturn(kafkaFuture); + + admin=mock(Admin.class); + when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult); + + offsetManagers= new HashMap<>(); + + OffsetManager offsetManagerTaP1 = mock(OffsetManager.class); + when(offsetManagerTaP1.getCommittedOffset()).thenReturn(90L); + + OffsetManager offsetManagerTaP2 = mock(OffsetManager.class); + when(offsetManagerTaP2.getCommittedOffset()).thenReturn(170L); + + OffsetManager offsetManagerTbP1 = mock(OffsetManager.class); + when(offsetManagerTbP1.getCommittedOffset()).thenReturn(200L); + + OffsetManager offsetManagerTbP2 = mock(OffsetManager.class); + when(offsetManagerTbP2.getCommittedOffset()).thenReturn(350L); + + offsetManagers.put(tAp1,offsetManagerTaP1); + offsetManagers.put(tAp2,offsetManagerTaP2); + offsetManagers.put(tBp1,offsetManagerTbP1); + offsetManagers.put(tBp2,offsetManagerTbP2); + + assignment=new HashSet<>(); + assignment.add(tAp1); + assignment.add(tAp2); + assignment.add(tBp1); + assignment.add(tBp2); + + + KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics = new KafkaOffsetPartitionAndTopicMetrics(() -> Collections.unmodifiableMap(offsetManagers),() -> admin,assignment); + Map result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); + Gauge g1= (Gauge) result.get("topicA/partition_1/spoutLag"); + Gauge g2= (Gauge) result.get("topicA/partition_2/spoutLag"); + Gauge g3= (Gauge) result.get("topicB/partition_1/spoutLag"); + Gauge g4= (Gauge) result.get("topicB/partition_2/spoutLag"); + assertEquals(g1.getValue(),10L); + assertEquals(g2.getValue(),30L); + assertEquals(g3.getValue(),100L); + assertEquals(g4.getValue(),50L); + + Gauge gATotal= (Gauge) result.get("topicA/totalSpoutLag"); + assertEquals(gATotal.getValue(),40L); + Gauge gBTotal= (Gauge) result.get("topicB/totalSpoutLag"); + assertEquals(gBTotal.getValue(),150L); + + + } +} From eef79aa03793d65bdfe9c2ca00e7d4a64f17a3ef Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Sun, 20 Apr 2025 19:59:56 +0100 Subject: [PATCH 03/17] Adding unit tests (cherry picked from commit 4a5c383a7d59ba7dd4a992e070205ad59c209fc7) --- .../KafkaOffsetPartitionAndTopicMetrics.java | 6 +++--- ...KafkaOffsetPartitionAndTopicMetricsTest.java | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index 973ce99f313..a14a44c1d1e 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -47,19 +47,19 @@ public class KafkaOffsetPartitionAndTopicMetrics implements MetricSet { private final Supplier> offsetManagerSupplier; private final Supplier adminSupplier; private final Set assignment; - private final Map topicMetricsMap; - public KafkaOffsetPartitionAndTopicMetrics(Supplier> offsetManagerSupplier, Supplier adminSupplier, Set assignment) { this.offsetManagerSupplier = offsetManagerSupplier; this.adminSupplier = adminSupplier; this.assignment = assignment; - this.topicMetricsMap=new HashMap<>(); + } @Override public Map getMetrics() { + Map topicMetricsMap=new HashMap<>(); + Map metrics = new HashMap<>(); for (TopicPartition topicPartition : assignment) { diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java index 0475155a993..224a2d6cddb 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java @@ -101,6 +101,23 @@ public void registerPartitionAndTopicMetrics() throws ExecutionException, Interr Gauge gBTotal= (Gauge) result.get("topicB/totalSpoutLag"); assertEquals(gBTotal.getValue(),150L); + //get the metrics a second time. Values should be the same. In particular, the total values for the topic should not accumulate. Each call to getMetrics should reset the total values. + + result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); + g1= (Gauge) result.get("topicA/partition_1/spoutLag"); + g2= (Gauge) result.get("topicA/partition_2/spoutLag"); + g3= (Gauge) result.get("topicB/partition_1/spoutLag"); + g4= (Gauge) result.get("topicB/partition_2/spoutLag"); + assertEquals(g1.getValue(),10L); + assertEquals(g2.getValue(),30L); + assertEquals(g3.getValue(),100L); + assertEquals(g4.getValue(),50L); + + gATotal= (Gauge) result.get("topicA/totalSpoutLag"); + assertEquals(gATotal.getValue(),40L); + gBTotal= (Gauge) result.get("topicB/totalSpoutLag"); + assertEquals(gBTotal.getValue(),150L); + } } From 57ea5b3765a1f4a16f7e79036fee95efe1d680ae Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Tue, 22 Apr 2025 22:20:06 +0100 Subject: [PATCH 04/17] Adding unit tests (cherry picked from commit 7a4639cd808d07ee4fe0f0ef8b3adb8ed2ea9651) --- .../KafkaOffsetPartitionAndTopicMetrics.java | 12 +- ...fkaOffsetPartitionAndTopicMetricsTest.java | 133 ++++++++++++++---- 2 files changed, 111 insertions(+), 34 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index a14a44c1d1e..a4c3b4f4ddf 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -79,7 +79,7 @@ public Map getMetrics() { LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); return 0L; } - // add value to topic level metric + OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); Long ret = endOffsets.get(topicPartition) - offsetManager.getCommittedOffset(); finalTopicMetrics.totalSpoutLag += ret; @@ -92,7 +92,7 @@ public Map getMetrics() { LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", topicPartition); return 0L; } - // add value to topic level metric + Long ret = beginningOffsets.get(topicPartition); finalTopicMetrics.totalEarliestTimeOffset += ret; return ret; @@ -104,14 +104,14 @@ public Map getMetrics() { LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); return 0L; } - // add value to topic level metric + Long ret = endOffsets.get(topicPartition); finalTopicMetrics.totalLatestTimeOffset += ret; return ret; }; Gauge latestEmittedOffsetGauge = () -> { - // add value to topic level metric + OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); Long ret = offsetManager.getLatestEmittedOffset(); finalTopicMetrics.totalLatestEmittedOffset+=ret; @@ -119,7 +119,7 @@ public Map getMetrics() { }; Gauge latestCompletedOffsetGauge = () -> { - // add value to topic level metric + OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); Long ret = offsetManager.getCommittedOffset(); finalTopicMetrics.totalLatestCompletedOffset+=ret; @@ -137,7 +137,7 @@ public Map getMetrics() { LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", topicPartition); return 0L; } - // add value to topic level metric + Long ret = endOffsets.get(topicPartition) - beginningOffsets.get(topicPartition); finalTopicMetrics.totalRecordsInPartitions+=ret; return ret; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java index 224a2d6cddb..a0424dddda3 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java @@ -8,6 +8,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.internal.OffsetManager; import org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionAndTopicMetrics; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; @@ -16,50 +17,51 @@ import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) public class KafkaOffsetPartitionAndTopicMetricsTest { private Set assignment; - private Admin admin; + private Admin admin = mock(Admin.class); private HashMap offsetManagers; - private ListOffsetsResult listOffsetsResult; - private KafkaFuture> kafkaFuture; - private Map topicPartitionListOffsetsResultInfoMap; + private ListOffsetsResult listOffsetsResultEarliest; + private KafkaFuture kafkaFuture = mock(KafkaFuture.class); + + @BeforeEach + public void initializeTests() { + reset(admin, kafkaFuture); + + } @Test - public void registerPartitionAndTopicMetrics() throws ExecutionException, InterruptedException { + public void registerMetricsGetLatest() throws ExecutionException, InterruptedException { TopicPartition tAp1 = new TopicPartition("topicA",1); TopicPartition tAp2 = new TopicPartition("topicA",2); TopicPartition tBp1 = new TopicPartition("topicB",1); TopicPartition tBp2 = new TopicPartition("topicB",2); - ListOffsetsResult.ListOffsetsResultInfo tAp1ListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100,System.currentTimeMillis(),Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tAp2ListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(200,System.currentTimeMillis(),Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp1ListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300,System.currentTimeMillis(),Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp2ListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tAp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tAp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(200,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400,System.currentTimeMillis(),Optional.empty()); - topicPartitionListOffsetsResultInfoMap = new HashMap<>(); + Map topicPartitionLatestListOffsetsResultInfoMap; - topicPartitionListOffsetsResultInfoMap.put(tAp1,tAp1ListOffsetsResultInfo); - topicPartitionListOffsetsResultInfoMap.put(tAp2,tAp2ListOffsetsResultInfo); - topicPartitionListOffsetsResultInfoMap.put(tBp1,tBp1ListOffsetsResultInfo); - topicPartitionListOffsetsResultInfoMap.put(tBp2,tBp2ListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>(); + topicPartitionLatestListOffsetsResultInfoMap.put(tAp1,tAp1LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(tAp2,tAp2LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(tBp1,tBp1LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(tBp2,tBp2LatestListOffsetsResultInfo); - kafkaFuture = mock(KafkaFuture.class); - when(kafkaFuture.get()).thenReturn(topicPartitionListOffsetsResultInfoMap); + when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap); - listOffsetsResult = mock(ListOffsetsResult.class); - when(listOffsetsResult.all()).thenReturn(kafkaFuture); + listOffsetsResultEarliest = mock(ListOffsetsResult.class); + when(listOffsetsResultEarliest.all()).thenReturn(kafkaFuture); admin=mock(Admin.class); - when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult); - - offsetManagers= new HashMap<>(); + when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResultEarliest); OffsetManager offsetManagerTaP1 = mock(OffsetManager.class); when(offsetManagerTaP1.getCommittedOffset()).thenReturn(90L); @@ -73,6 +75,7 @@ public void registerPartitionAndTopicMetrics() throws ExecutionException, Interr OffsetManager offsetManagerTbP2 = mock(OffsetManager.class); when(offsetManagerTbP2.getCommittedOffset()).thenReturn(350L); + offsetManagers= new HashMap<>(); offsetManagers.put(tAp1,offsetManagerTaP1); offsetManagers.put(tAp2,offsetManagerTaP2); offsetManagers.put(tBp1,offsetManagerTbP1); @@ -104,10 +107,10 @@ public void registerPartitionAndTopicMetrics() throws ExecutionException, Interr //get the metrics a second time. Values should be the same. In particular, the total values for the topic should not accumulate. Each call to getMetrics should reset the total values. result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); - g1= (Gauge) result.get("topicA/partition_1/spoutLag"); - g2= (Gauge) result.get("topicA/partition_2/spoutLag"); - g3= (Gauge) result.get("topicB/partition_1/spoutLag"); - g4= (Gauge) result.get("topicB/partition_2/spoutLag"); + g1= (Gauge) result.get("topicA/partition_1/spoutLag"); + g2= (Gauge) result.get("topicA/partition_2/spoutLag"); + g3= (Gauge) result.get("topicB/partition_1/spoutLag"); + g4= (Gauge) result.get("topicB/partition_2/spoutLag"); assertEquals(g1.getValue(),10L); assertEquals(g2.getValue(),30L); assertEquals(g3.getValue(),100L); @@ -119,5 +122,79 @@ public void registerPartitionAndTopicMetrics() throws ExecutionException, Interr assertEquals(gBTotal.getValue(),150L); + } + + @Test + public void registerMetricsGetEarliest() throws ExecutionException, InterruptedException { + + TopicPartition tAp1 = new TopicPartition("topicA",1); + TopicPartition tAp2 = new TopicPartition("topicA",2); + TopicPartition tBp1 = new TopicPartition("topicB",1); + TopicPartition tBp2 = new TopicPartition("topicB",2); + + ListOffsetsResult.ListOffsetsResultInfo tAp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tAp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1,System.currentTimeMillis(),Optional.empty()); + + Map topicPartitionEarliestListOffsetsResultInfoMap; + + topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>(); + topicPartitionEarliestListOffsetsResultInfoMap.put(tAp1,tAp1EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(tAp2,tAp2EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(tBp1,tBp1EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(tBp2,tBp2EarliestListOffsetsResultInfo); + + when(kafkaFuture.get()).thenReturn(topicPartitionEarliestListOffsetsResultInfoMap); + + listOffsetsResultEarliest = mock(ListOffsetsResult.class); + when(listOffsetsResultEarliest.all()).thenReturn(kafkaFuture); + + admin=mock(Admin.class); + when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResultEarliest); + + offsetManagers= new HashMap<>(); + + assignment=new HashSet<>(); + assignment.add(tAp1); + assignment.add(tAp2); + assignment.add(tBp1); + assignment.add(tBp2); + + KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics = new KafkaOffsetPartitionAndTopicMetrics(() -> Collections.unmodifiableMap(offsetManagers),() -> admin,assignment); + Map result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); + Gauge g1= (Gauge) result.get("topicA/partition_1/earliestTimeOffset"); + Gauge g2= (Gauge) result.get("topicA/partition_2/earliestTimeOffset"); + Gauge g3= (Gauge) result.get("topicB/partition_1/earliestTimeOffset"); + Gauge g4= (Gauge) result.get("topicB/partition_2/earliestTimeOffset"); + assertEquals(g1.getValue(),1L); + assertEquals(g2.getValue(),1L); + assertEquals(g3.getValue(),1L); + assertEquals(g4.getValue(),1L); + + Gauge gATotal= (Gauge) result.get("topicA/totalEarliestTimeOffset"); + assertEquals(2L,gATotal.getValue()); + Gauge gBTotal= (Gauge) result.get("topicB/totalEarliestTimeOffset"); + assertEquals(2L,gBTotal.getValue()); + + //get the metrics a second time. Values should be the same. In particular, the total values for the topic should not accumulate. Each call to getMetrics should reset the total values. + + result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); + + g1=(Gauge) result.get("topicA/partition_1/earliestTimeOffset"); + g2=(Gauge) result.get("topicA/partition_2/earliestTimeOffset"); + g3=(Gauge) result.get("topicB/partition_1/earliestTimeOffset"); + g4=(Gauge) result.get("topicB/partition_2/earliestTimeOffset"); + assertEquals(g1.getValue(),1L); + assertEquals(g2.getValue(),1L); + assertEquals(g3.getValue(),1L); + assertEquals(g4.getValue(),1L); + + gATotal= (Gauge) result.get("topicA/totalEarliestTimeOffset"); + assertEquals(2L,gATotal.getValue()); + gBTotal= (Gauge) result.get("topicB/totalEarliestTimeOffset"); + assertEquals(2L,gBTotal.getValue()); + + } } From fa6924855bfa901d56e2648b5c0e341e556b46fa Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Tue, 22 Apr 2025 22:40:31 +0100 Subject: [PATCH 05/17] Adding unit tests (cherry picked from commit 1e1edb5b28cc97593197bfb38b5620b7eb3af0b4) --- .../KafkaOffsetPartitionAndTopicMetrics.java | 14 +- ...fkaOffsetPartitionAndTopicMetricsTest.java | 182 +++++++++--------- 2 files changed, 97 insertions(+), 99 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index a4c3b4f4ddf..707796d9d9c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -58,20 +58,20 @@ public KafkaOffsetPartitionAndTopicMetrics(Supplier getMetrics() { - Map topicMetricsMap=new HashMap<>(); + Map topicMetricsMap = new HashMap<>(); Map metrics = new HashMap<>(); for (TopicPartition topicPartition : assignment) { - String topic=topicPartition.topic(); + String topic = topicPartition.topic(); KafkaOffsetTopicMetrics topicMetrics = topicMetricsMap.get(topic); if (topicMetrics == null) { topicMetrics = new KafkaOffsetTopicMetrics(topic); topicMetricsMap.put(topic, topicMetrics); } - String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); + String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); KafkaOffsetTopicMetrics finalTopicMetrics = topicMetrics; Gauge spoutLagGauge = () -> { Map endOffsets = getEndOffsets(Collections.singleton(topicPartition)); @@ -114,7 +114,7 @@ public Map getMetrics() { OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); Long ret = offsetManager.getLatestEmittedOffset(); - finalTopicMetrics.totalLatestEmittedOffset+=ret; + finalTopicMetrics.totalLatestEmittedOffset += ret; return ret; }; @@ -122,7 +122,7 @@ public Map getMetrics() { OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); Long ret = offsetManager.getCommittedOffset(); - finalTopicMetrics.totalLatestCompletedOffset+=ret; + finalTopicMetrics.totalLatestCompletedOffset += ret; return ret; }; @@ -139,7 +139,7 @@ public Map getMetrics() { } Long ret = endOffsets.get(topicPartition) - beginningOffsets.get(topicPartition); - finalTopicMetrics.totalRecordsInPartitions+=ret; + finalTopicMetrics.totalRecordsInPartitions += ret; return ret; }; @@ -152,7 +152,7 @@ public Map getMetrics() { } - for( KafkaOffsetTopicMetrics kafkaOffsetTopicMetrics : topicMetricsMap.values()){ + for (KafkaOffsetTopicMetrics kafkaOffsetTopicMetrics : topicMetricsMap.values()) { metrics.putAll(kafkaOffsetTopicMetrics.getMetrics()); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java index a0424dddda3..7fcd66681ae 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java @@ -37,30 +37,30 @@ public void initializeTests() { @Test public void registerMetricsGetLatest() throws ExecutionException, InterruptedException { - TopicPartition tAp1 = new TopicPartition("topicA",1); - TopicPartition tAp2 = new TopicPartition("topicA",2); - TopicPartition tBp1 = new TopicPartition("topicB",1); - TopicPartition tBp2 = new TopicPartition("topicB",2); + TopicPartition tAp1 = new TopicPartition("topicA", 1); + TopicPartition tAp2 = new TopicPartition("topicA", 2); + TopicPartition tBp1 = new TopicPartition("topicB", 1); + TopicPartition tBp2 = new TopicPartition("topicB", 2); - ListOffsetsResult.ListOffsetsResultInfo tAp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100,System.currentTimeMillis(),Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tAp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(200,System.currentTimeMillis(),Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300,System.currentTimeMillis(),Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tAp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tAp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(200, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(), Optional.empty()); - Map topicPartitionLatestListOffsetsResultInfoMap; + Map topicPartitionLatestListOffsetsResultInfoMap; topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>(); - topicPartitionLatestListOffsetsResultInfoMap.put(tAp1,tAp1LatestListOffsetsResultInfo); - topicPartitionLatestListOffsetsResultInfoMap.put(tAp2,tAp2LatestListOffsetsResultInfo); - topicPartitionLatestListOffsetsResultInfoMap.put(tBp1,tBp1LatestListOffsetsResultInfo); - topicPartitionLatestListOffsetsResultInfoMap.put(tBp2,tBp2LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(tAp1, tAp1LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(tAp2, tAp2LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(tBp1, tBp1LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(tBp2, tBp2LatestListOffsetsResultInfo); when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap); listOffsetsResultEarliest = mock(ListOffsetsResult.class); when(listOffsetsResultEarliest.all()).thenReturn(kafkaFuture); - admin=mock(Admin.class); + admin = mock(Admin.class); when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResultEarliest); OffsetManager offsetManagerTaP1 = mock(OffsetManager.class); @@ -75,51 +75,51 @@ public void registerMetricsGetLatest() throws ExecutionException, InterruptedExc OffsetManager offsetManagerTbP2 = mock(OffsetManager.class); when(offsetManagerTbP2.getCommittedOffset()).thenReturn(350L); - offsetManagers= new HashMap<>(); - offsetManagers.put(tAp1,offsetManagerTaP1); - offsetManagers.put(tAp2,offsetManagerTaP2); - offsetManagers.put(tBp1,offsetManagerTbP1); - offsetManagers.put(tBp2,offsetManagerTbP2); + offsetManagers = new HashMap<>(); + offsetManagers.put(tAp1, offsetManagerTaP1); + offsetManagers.put(tAp2, offsetManagerTaP2); + offsetManagers.put(tBp1, offsetManagerTbP1); + offsetManagers.put(tBp2, offsetManagerTbP2); - assignment=new HashSet<>(); + assignment = new HashSet<>(); assignment.add(tAp1); assignment.add(tAp2); assignment.add(tBp1); assignment.add(tBp2); - KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics = new KafkaOffsetPartitionAndTopicMetrics(() -> Collections.unmodifiableMap(offsetManagers),() -> admin,assignment); - Map result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); - Gauge g1= (Gauge) result.get("topicA/partition_1/spoutLag"); - Gauge g2= (Gauge) result.get("topicA/partition_2/spoutLag"); - Gauge g3= (Gauge) result.get("topicB/partition_1/spoutLag"); - Gauge g4= (Gauge) result.get("topicB/partition_2/spoutLag"); - assertEquals(g1.getValue(),10L); - assertEquals(g2.getValue(),30L); - assertEquals(g3.getValue(),100L); - assertEquals(g4.getValue(),50L); + KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics = new KafkaOffsetPartitionAndTopicMetrics(() -> Collections.unmodifiableMap(offsetManagers), () -> admin, assignment); + Map result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); + Gauge g1 = (Gauge) result.get("topicA/partition_1/spoutLag"); + Gauge g2 = (Gauge) result.get("topicA/partition_2/spoutLag"); + Gauge g3 = (Gauge) result.get("topicB/partition_1/spoutLag"); + Gauge g4 = (Gauge) result.get("topicB/partition_2/spoutLag"); + assertEquals(g1.getValue(), 10L); + assertEquals(g2.getValue(), 30L); + assertEquals(g3.getValue(), 100L); + assertEquals(g4.getValue(), 50L); - Gauge gATotal= (Gauge) result.get("topicA/totalSpoutLag"); - assertEquals(gATotal.getValue(),40L); - Gauge gBTotal= (Gauge) result.get("topicB/totalSpoutLag"); - assertEquals(gBTotal.getValue(),150L); + Gauge gATotal = (Gauge) result.get("topicA/totalSpoutLag"); + assertEquals(gATotal.getValue(), 40L); + Gauge gBTotal = (Gauge) result.get("topicB/totalSpoutLag"); + assertEquals(gBTotal.getValue(), 150L); //get the metrics a second time. Values should be the same. In particular, the total values for the topic should not accumulate. Each call to getMetrics should reset the total values. result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); - g1= (Gauge) result.get("topicA/partition_1/spoutLag"); - g2= (Gauge) result.get("topicA/partition_2/spoutLag"); - g3= (Gauge) result.get("topicB/partition_1/spoutLag"); - g4= (Gauge) result.get("topicB/partition_2/spoutLag"); - assertEquals(g1.getValue(),10L); - assertEquals(g2.getValue(),30L); - assertEquals(g3.getValue(),100L); - assertEquals(g4.getValue(),50L); + g1 = (Gauge) result.get("topicA/partition_1/spoutLag"); + g2 = (Gauge) result.get("topicA/partition_2/spoutLag"); + g3 = (Gauge) result.get("topicB/partition_1/spoutLag"); + g4 = (Gauge) result.get("topicB/partition_2/spoutLag"); + assertEquals(g1.getValue(), 10L); + assertEquals(g2.getValue(), 30L); + assertEquals(g3.getValue(), 100L); + assertEquals(g4.getValue(), 50L); - gATotal= (Gauge) result.get("topicA/totalSpoutLag"); - assertEquals(gATotal.getValue(),40L); - gBTotal= (Gauge) result.get("topicB/totalSpoutLag"); - assertEquals(gBTotal.getValue(),150L); + gATotal = (Gauge) result.get("topicA/totalSpoutLag"); + assertEquals(gATotal.getValue(), 40L); + gBTotal = (Gauge) result.get("topicB/totalSpoutLag"); + assertEquals(gBTotal.getValue(), 150L); } @@ -127,74 +127,72 @@ public void registerMetricsGetLatest() throws ExecutionException, InterruptedExc @Test public void registerMetricsGetEarliest() throws ExecutionException, InterruptedException { - TopicPartition tAp1 = new TopicPartition("topicA",1); - TopicPartition tAp2 = new TopicPartition("topicA",2); - TopicPartition tBp1 = new TopicPartition("topicB",1); - TopicPartition tBp2 = new TopicPartition("topicB",2); + TopicPartition tAp1 = new TopicPartition("topicA", 1); + TopicPartition tAp2 = new TopicPartition("topicA", 2); + TopicPartition tBp1 = new TopicPartition("topicB", 1); + TopicPartition tBp2 = new TopicPartition("topicB", 2); - ListOffsetsResult.ListOffsetsResultInfo tAp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1,System.currentTimeMillis(),Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tAp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1,System.currentTimeMillis(),Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1,System.currentTimeMillis(),Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1,System.currentTimeMillis(),Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tAp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tAp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); - Map topicPartitionEarliestListOffsetsResultInfoMap; + Map topicPartitionEarliestListOffsetsResultInfoMap; topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>(); - topicPartitionEarliestListOffsetsResultInfoMap.put(tAp1,tAp1EarliestListOffsetsResultInfo); - topicPartitionEarliestListOffsetsResultInfoMap.put(tAp2,tAp2EarliestListOffsetsResultInfo); - topicPartitionEarliestListOffsetsResultInfoMap.put(tBp1,tBp1EarliestListOffsetsResultInfo); - topicPartitionEarliestListOffsetsResultInfoMap.put(tBp2,tBp2EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(tAp1, tAp1EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(tAp2, tAp2EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(tBp1, tBp1EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(tBp2, tBp2EarliestListOffsetsResultInfo); when(kafkaFuture.get()).thenReturn(topicPartitionEarliestListOffsetsResultInfoMap); listOffsetsResultEarliest = mock(ListOffsetsResult.class); when(listOffsetsResultEarliest.all()).thenReturn(kafkaFuture); - admin=mock(Admin.class); + admin = mock(Admin.class); when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResultEarliest); - offsetManagers= new HashMap<>(); + offsetManagers = new HashMap<>(); - assignment=new HashSet<>(); + assignment = new HashSet<>(); assignment.add(tAp1); assignment.add(tAp2); assignment.add(tBp1); assignment.add(tBp2); - KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics = new KafkaOffsetPartitionAndTopicMetrics(() -> Collections.unmodifiableMap(offsetManagers),() -> admin,assignment); - Map result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); - Gauge g1= (Gauge) result.get("topicA/partition_1/earliestTimeOffset"); - Gauge g2= (Gauge) result.get("topicA/partition_2/earliestTimeOffset"); - Gauge g3= (Gauge) result.get("topicB/partition_1/earliestTimeOffset"); - Gauge g4= (Gauge) result.get("topicB/partition_2/earliestTimeOffset"); - assertEquals(g1.getValue(),1L); - assertEquals(g2.getValue(),1L); - assertEquals(g3.getValue(),1L); - assertEquals(g4.getValue(),1L); - - Gauge gATotal= (Gauge) result.get("topicA/totalEarliestTimeOffset"); - assertEquals(2L,gATotal.getValue()); - Gauge gBTotal= (Gauge) result.get("topicB/totalEarliestTimeOffset"); - assertEquals(2L,gBTotal.getValue()); + KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics = new KafkaOffsetPartitionAndTopicMetrics(() -> Collections.unmodifiableMap(offsetManagers), () -> admin, assignment); + Map result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); + Gauge g1 = (Gauge) result.get("topicA/partition_1/earliestTimeOffset"); + Gauge g2 = (Gauge) result.get("topicA/partition_2/earliestTimeOffset"); + Gauge g3 = (Gauge) result.get("topicB/partition_1/earliestTimeOffset"); + Gauge g4 = (Gauge) result.get("topicB/partition_2/earliestTimeOffset"); + assertEquals(g1.getValue(), 1L); + assertEquals(g2.getValue(), 1L); + assertEquals(g3.getValue(), 1L); + assertEquals(g4.getValue(), 1L); + + Gauge gATotal = (Gauge) result.get("topicA/totalEarliestTimeOffset"); + assertEquals(2L, gATotal.getValue()); + Gauge gBTotal = (Gauge) result.get("topicB/totalEarliestTimeOffset"); + assertEquals(2L, gBTotal.getValue()); //get the metrics a second time. Values should be the same. In particular, the total values for the topic should not accumulate. Each call to getMetrics should reset the total values. result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); - g1=(Gauge) result.get("topicA/partition_1/earliestTimeOffset"); - g2=(Gauge) result.get("topicA/partition_2/earliestTimeOffset"); - g3=(Gauge) result.get("topicB/partition_1/earliestTimeOffset"); - g4=(Gauge) result.get("topicB/partition_2/earliestTimeOffset"); - assertEquals(g1.getValue(),1L); - assertEquals(g2.getValue(),1L); - assertEquals(g3.getValue(),1L); - assertEquals(g4.getValue(),1L); - - gATotal= (Gauge) result.get("topicA/totalEarliestTimeOffset"); - assertEquals(2L,gATotal.getValue()); - gBTotal= (Gauge) result.get("topicB/totalEarliestTimeOffset"); - assertEquals(2L,gBTotal.getValue()); - - + g1 = (Gauge) result.get("topicA/partition_1/earliestTimeOffset"); + g2 = (Gauge) result.get("topicA/partition_2/earliestTimeOffset"); + g3 = (Gauge) result.get("topicB/partition_1/earliestTimeOffset"); + g4 = (Gauge) result.get("topicB/partition_2/earliestTimeOffset"); + assertEquals(g1.getValue(), 1L); + assertEquals(g2.getValue(), 1L); + assertEquals(g3.getValue(), 1L); + assertEquals(g4.getValue(), 1L); + + gATotal = (Gauge) result.get("topicA/totalEarliestTimeOffset"); + assertEquals(2L, gATotal.getValue()); + gBTotal = (Gauge) result.get("topicB/totalEarliestTimeOffset"); + assertEquals(2L, gBTotal.getValue()); } } From 1a084f2a83f81ab97198f48755212739e823b6a1 Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Wed, 23 Apr 2025 08:10:59 +0100 Subject: [PATCH 06/17] Adding unit tests (cherry picked from commit 83bbf231cc16f038076568105055bc466edfb64e) --- .../apache/storm/kafka/spout/KafkaSpout.java | 59 ++++++++++--------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 2b35fdd9272..764ee643b65 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -39,6 +39,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + import org.apache.commons.lang.Validate; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; @@ -155,7 +156,7 @@ public void open(Map conf, TopologyContext context, SpoutOutputC tupleListener.open(conf, context); this.kafkaOffsetMetricManager - = new KafkaOffsetMetricManager<>(() -> Collections.unmodifiableMap(offsetManagers), () -> admin, context); + = new KafkaOffsetMetricManager<>(() -> Collections.unmodifiableMap(offsetManagers), () -> admin, context); LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); } @@ -184,7 +185,7 @@ public void onPartitionsRevoked(Collection partitions) { previousAssignment = partitions; LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions); + kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions); if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); @@ -194,7 +195,7 @@ public void onPartitionsRevoked(Collection partitions) { @Override public void onPartitionsAssigned(Collection partitions) { LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", - context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions); + context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions); initialize(partitions); tupleListener.onPartitionsReassigned(partitions); @@ -222,7 +223,7 @@ private void initialize(Collection partitions) { final OffsetAndMetadata committedOffset = consumer.committed(newTp); final long fetchOffset = doSeek(newTp, committedOffset); LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]", - fetchOffset, newTp, firstPollOffsetStrategy, committedOffset); + fetchOffset, newTp, firstPollOffsetStrategy, committedOffset); if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(newTp)) { offsetManagers.put(newTp, new OffsetManager(newTp, fetchOffset)); } @@ -235,13 +236,13 @@ private void initialize(Collection partitions) { */ private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) { LOG.trace("Seeking offset for topic-partition [{}] with [{}] and committed offset [{}]", - newTp, firstPollOffsetStrategy, committedOffset); + newTp, firstPollOffsetStrategy, committedOffset); if (committedOffset != null) { // offset was previously committed for this consumer group and topic-partition, either by this or another topology. if (commitMetadataManager.isOffsetCommittedByThisTopology(newTp, - committedOffset, - Collections.unmodifiableMap(offsetManagers))) { + committedOffset, + Collections.unmodifiableMap(offsetManagers))) { // Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply. consumer.seek(newTp, committedOffset.offset()); } else { @@ -282,7 +283,7 @@ public void nextTuple() { commitOffsetsForAckedTuples(); } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) { Map offsetsToCommit = - createFetchedOffsetsMetadata(consumer.assignment()); + createFetchedOffsetsMetadata(consumer.assignment()); consumer.commitAsync(offsetsToCommit, null); LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } @@ -337,7 +338,7 @@ private PollablePartitionsInfo getPollablePartitionsInfo() { pollablePartitions.add(tp); } else { LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp, - numUncommittedOffsets, maxUncommittedOffsets); + numUncommittedOffsets, maxUncommittedOffsets); } } } @@ -346,7 +347,7 @@ private PollablePartitionsInfo getPollablePartitionsInfo() { private boolean isWaitingToEmit() { return waitingToEmit.values().stream() - .anyMatch(list -> !list.isEmpty()); + .anyMatch(list -> !list.isEmpty()); } private void setWaitingToEmit(ConsumerRecords consumerRecords) { @@ -366,11 +367,11 @@ private ConsumerRecords pollKafkaBroker(PollablePartitionsInfo pollablePar ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords); final int numPolledRecords = consumerRecords.count(); LOG.debug("Polled [{}] records from Kafka", - numPolledRecords); + numPolledRecords); if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { //Commit polled records immediately to ensure delivery is at-most-once. Map offsetsToCommit = - createFetchedOffsetsMetadata(consumer.assignment()); + createFetchedOffsetsMetadata(consumer.assignment()); consumer.commitSync(offsetsToCommit); LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } @@ -388,7 +389,7 @@ private void doSeekRetriableTopicPartitions(Map pollableEa } private void ackRetriableOffsetsIfCompactedAway(Map earliestRetriableOffsets, - ConsumerRecords consumerRecords) { + ConsumerRecords consumerRecords) { for (Entry entry : earliestRetriableOffsets.entrySet()) { TopicPartition tp = entry.getKey(); List> records = consumerRecords.records(tp); @@ -530,7 +531,7 @@ private void commitOffsetsForAckedTuples() { * to the committed offset. */ LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]", - position, committedOffset); + position, committedOffset); consumer.seek(tp, committedOffset); } /** @@ -540,8 +541,8 @@ private void commitOffsetsForAckedTuples() { if (waitingToEmitForTp != null) { //Discard the pending records that are already committed waitingToEmit.put(tp, waitingToEmitForTp.stream() - .filter(record -> record.offset() >= committedOffset) - .collect(Collectors.toCollection(LinkedList::new))); + .filter(record -> record.offset() >= committedOffset) + .collect(Collectors.toCollection(LinkedList::new))); } final OffsetManager offsetManager = offsetManagers.get(tp); @@ -574,11 +575,11 @@ public void ack(Object messageId) { if (!emitted.contains(msgId)) { LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " - + "came from a topic-partition that this consumer group instance is no longer tracking " - + "due to rebalance/partition reassignment. No action taken.", msgId); + + "came from a topic-partition that this consumer group instance is no longer tracking " + + "due to rebalance/partition reassignment. No action taken.", msgId); } else { Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." - + " This should never occur barring errors in the RetryService implementation or the spout code."); + + " This should never occur barring errors in the RetryService implementation or the spout code."); offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); emitted.remove(msgId); } @@ -596,11 +597,11 @@ public void fail(Object messageId) { final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (!emitted.contains(msgId)) { LOG.debug("Received fail for tuple this spout is no longer tracking." - + " Partitions may have been reassigned. Ignoring message [{}]", msgId); + + " Partitions may have been reassigned. Ignoring message [{}]", msgId); return; } Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed." - + " This should never occur barring errors in the RetryService implementation or the spout code."); + + " This should never occur barring errors in the RetryService implementation or the spout code."); msgId.incrementNumFails(); @@ -631,7 +632,7 @@ private void refreshAssignment() { List allPartitionsSorted = new ArrayList<>(allPartitions); Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE); Set assignedPartitions = kafkaSpoutConfig.getTopicPartitioner() - .getPartitionsForThisTask(allPartitionsSorted, context); + .getPartitionsForThisTask(allPartitionsSorted, context); boolean partitionChanged = topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener); if (partitionChanged && canRegisterMetrics()) { LOG.info("Partitions assignments has changed, updating metrics."); @@ -684,9 +685,9 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public String toString() { return "KafkaSpout{" - + "offsetManagers =" + offsetManagers - + ", emitted=" + emitted - + "}"; + + "offsetManagers =" + offsetManagers + + ", emitted=" + emitted + + "}"; } @Override @@ -719,8 +720,8 @@ private boolean isPrimitiveOrWrapper(Class type) { private boolean isWrapper(Class type) { return type == Double.class || type == Float.class || type == Long.class - || type == Integer.class || type == Short.class || type == Character.class - || type == Byte.class || type == Boolean.class || type == String.class; + || type == Integer.class || type == Short.class || type == Character.class + || type == Byte.class || type == Boolean.class || type == String.class; } private String getTopicsString() { @@ -736,8 +737,8 @@ private static class PollablePartitionsInfo { PollablePartitionsInfo(Set pollablePartitions, Map earliestRetriableOffsets) { this.pollablePartitions = pollablePartitions; this.pollableEarliestRetriableOffsets = earliestRetriableOffsets.entrySet().stream() - .filter(entry -> pollablePartitions.contains(entry.getKey())) - .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); + .filter(entry -> pollablePartitions.contains(entry.getKey())) + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); } public boolean shouldPoll() { From 43195c91211f6bbc8a50901e04a4c2c349fad7f6 Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Wed, 23 Apr 2025 08:38:44 +0100 Subject: [PATCH 07/17] Adding unit tests (cherry picked from commit c07874c4d2b65e6c4b109899c93abf6dc3fa8248) --- .../KafkaOffsetPartitionAndTopicMetrics.java | 36 +++++++++--------- ...fkaOffsetPartitionAndTopicMetricsTest.java | 38 ++++++++++++++++--- 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index 707796d9d9c..8cb4cae2d1a 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -55,6 +55,24 @@ public KafkaOffsetPartitionAndTopicMetrics(Supplier getOffsets(Admin admin, Set topicPartitions, OffsetSpec offsetSpec) + throws InterruptedException, ExecutionException { + + Map offsetSpecMap = new HashMap<>(); + for (TopicPartition topicPartition : topicPartitions) { + offsetSpecMap.put(topicPartition, offsetSpec); + } + Map ret = new HashMap<>(); + ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap); + KafkaFuture> all = listOffsetsResult.all(); + Map topicPartitionListOffsetsResultInfoMap = all.get(); + for (Map.Entry entry : + topicPartitionListOffsetsResultInfoMap.entrySet()) { + ret.put(entry.getKey(), entry.getValue().offset()); + } + return ret; + } + @Override public Map getMetrics() { @@ -193,22 +211,4 @@ private Map getEndOffsets(Set topicPartiti } return endOffsets; } - - private static Map getOffsets(Admin admin, Set topicPartitions, OffsetSpec offsetSpec) - throws InterruptedException, ExecutionException { - - Map offsetSpecMap = new HashMap<>(); - for (TopicPartition topicPartition : topicPartitions) { - offsetSpecMap.put(topicPartition, offsetSpec); - } - Map ret = new HashMap<>(); - ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap); - KafkaFuture> all = listOffsetsResult.all(); - Map topicPartitionListOffsetsResultInfoMap = all.get(); - for (Map.Entry entry : - topicPartitionListOffsetsResultInfoMap.entrySet()) { - ret.put(entry.getKey(), entry.getValue().offset()); - } - return ret; - } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java index 7fcd66681ae..da1493d76f2 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java @@ -35,7 +35,7 @@ public void initializeTests() { } @Test - public void registerMetricsGetLatest() throws ExecutionException, InterruptedException { + public void registerMetricsGetSpoutLag() throws ExecutionException, InterruptedException { TopicPartition tAp1 = new TopicPartition("topicA", 1); TopicPartition tAp2 = new TopicPartition("topicA", 2); @@ -125,7 +125,7 @@ public void registerMetricsGetLatest() throws ExecutionException, InterruptedExc } @Test - public void registerMetricsGetEarliest() throws ExecutionException, InterruptedException { + public void registerMetricsGetEarliestAndLatest() throws ExecutionException, InterruptedException { TopicPartition tAp1 = new TopicPartition("topicA", 1); TopicPartition tAp2 = new TopicPartition("topicA", 2); @@ -137,9 +137,7 @@ public void registerMetricsGetEarliest() throws ExecutionException, InterruptedE ListOffsetsResult.ListOffsetsResultInfo tBp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); ListOffsetsResult.ListOffsetsResultInfo tBp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); - Map topicPartitionEarliestListOffsetsResultInfoMap; - - topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>(); + Map topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>(); topicPartitionEarliestListOffsetsResultInfoMap.put(tAp1, tAp1EarliestListOffsetsResultInfo); topicPartitionEarliestListOffsetsResultInfoMap.put(tAp2, tAp2EarliestListOffsetsResultInfo); topicPartitionEarliestListOffsetsResultInfoMap.put(tBp1, tBp1EarliestListOffsetsResultInfo); @@ -194,5 +192,35 @@ public void registerMetricsGetEarliest() throws ExecutionException, InterruptedE assertEquals(2L, gATotal.getValue()); gBTotal = (Gauge) result.get("topicB/totalEarliestTimeOffset"); assertEquals(2L, gBTotal.getValue()); + + //get the latest offsets + + ListOffsetsResult.ListOffsetsResultInfo tAp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tAp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(200, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(), Optional.empty()); + + Map topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>(); + topicPartitionLatestListOffsetsResultInfoMap.put(tAp1, tAp1LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(tAp2, tAp2LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(tBp1, tBp1LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(tBp2, tBp2LatestListOffsetsResultInfo); + + when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap); + + result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); + g1 = (Gauge) result.get("topicA/partition_1/latestTimeOffset"); + g2 = (Gauge) result.get("topicA/partition_2/latestTimeOffset"); + g3 = (Gauge) result.get("topicB/partition_1/latestTimeOffset"); + g4 = (Gauge) result.get("topicB/partition_2/latestTimeOffset"); + assertEquals(100L, g1.getValue()); + assertEquals(200L, g2.getValue()); + assertEquals(300L, g3.getValue()); + assertEquals(400L, g4.getValue()); + + gATotal = (Gauge) result.get("topicA/totalLatestTimeOffset"); + assertEquals(300L, gATotal.getValue()); + gBTotal = (Gauge) result.get("topicB/totalLatestTimeOffset"); + assertEquals(700L, gBTotal.getValue()); } } From 667e0b001e35d6f84c88112ab06868532591c607 Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Wed, 23 Apr 2025 09:08:25 +0100 Subject: [PATCH 08/17] Adding unit tests (cherry picked from commit e1dcaa1cd447456f00f69bf90c9da1e2cd317dde) --- .../metrics2/KafkaOffsetMetricManager.java | 16 +++---- .../KafkaOffsetPartitionAndTopicMetrics.java | 4 +- ...fkaOffsetPartitionAndTopicMetricsTest.java | 48 +++++++++++++++++++ 3 files changed, 59 insertions(+), 9 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java index c78b5637e89..c0d7631f68c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,11 +18,6 @@ package org.apache.storm.kafka.spout.metrics2; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.function.Supplier; - import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.internal.OffsetManager; @@ -30,6 +25,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + /** * This class is used to manage both the partition and topic level offset metrics. */ @@ -57,7 +57,7 @@ public KafkaOffsetMetricManager(Supplier> off public void registerPartitionAndTopicLevelMetrics(Set newAssignment) { KafkaOffsetPartitionAndTopicMetrics topicPartitionMetricSet - = new KafkaOffsetPartitionAndTopicMetrics(offsetManagerSupplier, adminSupplier,newAssignment); + = new KafkaOffsetPartitionAndTopicMetrics(offsetManagerSupplier, adminSupplier, newAssignment); topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet); } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index 8cb4cae2d1a..fb9d3d31c3e 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -48,7 +48,9 @@ public class KafkaOffsetPartitionAndTopicMetrics implements MetricSet { private final Supplier adminSupplier; private final Set assignment; - public KafkaOffsetPartitionAndTopicMetrics(Supplier> offsetManagerSupplier, Supplier adminSupplier, Set assignment) { + public KafkaOffsetPartitionAndTopicMetrics(Supplier> offsetManagerSupplier, + Supplier adminSupplier, + Set assignment) { this.offsetManagerSupplier = offsetManagerSupplier; this.adminSupplier = adminSupplier; this.assignment = assignment; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java index da1493d76f2..dff2ad1bb94 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java @@ -151,7 +151,27 @@ public void registerMetricsGetEarliestAndLatest() throws ExecutionException, Int admin = mock(Admin.class); when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResultEarliest); + OffsetManager offsetManagerTaP1 = mock(OffsetManager.class); + when(offsetManagerTaP1.getLatestEmittedOffset()).thenReturn(50L); + when(offsetManagerTaP1.getCommittedOffset()).thenReturn(40L); + + OffsetManager offsetManagerTaP2 = mock(OffsetManager.class); + when(offsetManagerTaP2.getLatestEmittedOffset()).thenReturn(100L); + when(offsetManagerTaP2.getCommittedOffset()).thenReturn(90L); + + OffsetManager offsetManagerTbP1 = mock(OffsetManager.class); + when(offsetManagerTbP1.getLatestEmittedOffset()).thenReturn(150L); + when(offsetManagerTbP1.getCommittedOffset()).thenReturn(149L); + + OffsetManager offsetManagerTbP2 = mock(OffsetManager.class); + when(offsetManagerTbP2.getLatestEmittedOffset()).thenReturn(200L); + when(offsetManagerTbP2.getCommittedOffset()).thenReturn(200L); + offsetManagers = new HashMap<>(); + offsetManagers.put(tAp1, offsetManagerTaP1); + offsetManagers.put(tAp2, offsetManagerTaP2); + offsetManagers.put(tBp1, offsetManagerTbP1); + offsetManagers.put(tBp2, offsetManagerTbP2); assignment = new HashSet<>(); assignment.add(tAp1); @@ -222,5 +242,33 @@ public void registerMetricsGetEarliestAndLatest() throws ExecutionException, Int assertEquals(300L, gATotal.getValue()); gBTotal = (Gauge) result.get("topicB/totalLatestTimeOffset"); assertEquals(700L, gBTotal.getValue()); + + g1 = (Gauge) result.get("topicA/partition_1/latestEmittedOffset"); + g2 = (Gauge) result.get("topicA/partition_2/latestEmittedOffset"); + g3 = (Gauge) result.get("topicB/partition_1/latestEmittedOffset"); + g4 = (Gauge) result.get("topicB/partition_2/latestEmittedOffset"); + assertEquals(50L, g1.getValue()); + assertEquals(100L, g2.getValue()); + assertEquals(150L, g3.getValue()); + assertEquals(200L, g4.getValue()); + + gATotal = (Gauge) result.get("topicA/totalLatestEmittedOffset"); + assertEquals(150L, gATotal.getValue()); + gBTotal = (Gauge) result.get("topicB/totalLatestEmittedOffset"); + assertEquals(350L, gBTotal.getValue()); + + g1 = (Gauge) result.get("topicA/partition_1/latestCompletedOffset"); + g2 = (Gauge) result.get("topicA/partition_2/latestCompletedOffset"); + g3 = (Gauge) result.get("topicB/partition_1/latestCompletedOffset"); + g4 = (Gauge) result.get("topicB/partition_2/latestCompletedOffset"); + assertEquals(40L, g1.getValue()); + assertEquals(90L, g2.getValue()); + assertEquals(149L, g3.getValue()); + assertEquals(200L, g4.getValue()); + + gATotal = (Gauge) result.get("topicA/totalLatestCompletedOffset"); + assertEquals(130L, gATotal.getValue()); + gBTotal = (Gauge) result.get("topicB/totalLatestCompletedOffset"); + assertEquals(349L, gBTotal.getValue()); } } From b7a0eafc3635cf996e25c54e6aa7e2257a292d46 Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Wed, 23 Apr 2025 09:15:27 +0100 Subject: [PATCH 09/17] Adding unit tests (cherry picked from commit 2a8198ebc0e1673a90d6aec44fff171e3d7b2f83) --- .../spout/metrics2/KafkaOffsetMetricManager.java | 8 ++++---- .../KafkaOffsetPartitionAndTopicMetrics.java | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java index c0d7631f68c..9b6f662f541 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java @@ -18,6 +18,10 @@ package org.apache.storm.kafka.spout.metrics2; +import java.util.function.Supplier; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.internal.OffsetManager; @@ -25,10 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.function.Supplier; /** * This class is used to manage both the partition and topic level offset metrics. diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index fb9d3d31c3e..1b91636d5a2 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -3,6 +3,12 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricSet; +import java.util.function.Supplier; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.OffsetSpec; @@ -13,12 +19,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.function.Supplier; /** * This class is used compute the partition and topic level offset metrics. From 7f83cf14f31db05241c8fc69689f1cbe3d6d33e7 Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Wed, 23 Apr 2025 22:52:31 +0100 Subject: [PATCH 10/17] Adding unit tests (cherry picked from commit 4dcd178de560c594c74a3dd9121c11afede5da44) --- .../KafkaOffsetPartitionAndTopicMetrics.java | 2 - ...fkaOffsetPartitionAndTopicMetricsTest.java | 75 ++++++++++++++----- 2 files changed, 55 insertions(+), 22 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index 1b91636d5a2..1d98624098b 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -169,7 +169,6 @@ public Map getMetrics() { metrics.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffsetGauge); metrics.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffsetGauge); metrics.put(metricPath + "/" + "recordsInPartition", recordsInPartitionGauge); - } for (KafkaOffsetTopicMetrics kafkaOffsetTopicMetrics : topicMetricsMap.values()) { @@ -177,7 +176,6 @@ public Map getMetrics() { } return metrics; - } private Map getBeginningOffsets(Set topicPartitions) { diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java index dff2ad1bb94..02789b90595 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java @@ -1,5 +1,13 @@ package org.apache.storm.kafka.spout.metric2; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; import com.codahale.metrics.Gauge; import com.codahale.metrics.Metric; import org.apache.kafka.clients.admin.Admin; @@ -13,9 +21,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; -import java.util.*; -import java.util.concurrent.ExecutionException; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; @@ -25,7 +30,6 @@ public class KafkaOffsetPartitionAndTopicMetricsTest { private Set assignment; private Admin admin = mock(Admin.class); private HashMap offsetManagers; - private ListOffsetsResult listOffsetsResultEarliest; private KafkaFuture kafkaFuture = mock(KafkaFuture.class); @BeforeEach @@ -35,7 +39,7 @@ public void initializeTests() { } @Test - public void registerMetricsGetSpoutLag() throws ExecutionException, InterruptedException { + public void registerMetricsGetSpoutLagAndPartitionRecords() throws ExecutionException, InterruptedException { TopicPartition tAp1 = new TopicPartition("topicA", 1); TopicPartition tAp2 = new TopicPartition("topicA", 2); @@ -47,9 +51,7 @@ public void registerMetricsGetSpoutLag() throws ExecutionException, InterruptedE ListOffsetsResult.ListOffsetsResultInfo tBp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(), Optional.empty()); ListOffsetsResult.ListOffsetsResultInfo tBp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(), Optional.empty()); - Map topicPartitionLatestListOffsetsResultInfoMap; - - topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>(); + Map topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>(); topicPartitionLatestListOffsetsResultInfoMap.put(tAp1, tAp1LatestListOffsetsResultInfo); topicPartitionLatestListOffsetsResultInfoMap.put(tAp2, tAp2LatestListOffsetsResultInfo); topicPartitionLatestListOffsetsResultInfoMap.put(tBp1, tBp1LatestListOffsetsResultInfo); @@ -57,11 +59,11 @@ public void registerMetricsGetSpoutLag() throws ExecutionException, InterruptedE when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap); - listOffsetsResultEarliest = mock(ListOffsetsResult.class); - when(listOffsetsResultEarliest.all()).thenReturn(kafkaFuture); + ListOffsetsResult listOffsetsResult = mock(ListOffsetsResult.class); + when(listOffsetsResult.all()).thenReturn(kafkaFuture); admin = mock(Admin.class); - when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResultEarliest); + when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult); OffsetManager offsetManagerTaP1 = mock(OffsetManager.class); when(offsetManagerTaP1.getCommittedOffset()).thenReturn(90L); @@ -94,15 +96,15 @@ public void registerMetricsGetSpoutLag() throws ExecutionException, InterruptedE Gauge g2 = (Gauge) result.get("topicA/partition_2/spoutLag"); Gauge g3 = (Gauge) result.get("topicB/partition_1/spoutLag"); Gauge g4 = (Gauge) result.get("topicB/partition_2/spoutLag"); - assertEquals(g1.getValue(), 10L); - assertEquals(g2.getValue(), 30L); - assertEquals(g3.getValue(), 100L); - assertEquals(g4.getValue(), 50L); + assertEquals(10L, g1.getValue()); + assertEquals(30L, g2.getValue()); + assertEquals(100L, g3.getValue()); + assertEquals(50L, g4.getValue()); Gauge gATotal = (Gauge) result.get("topicA/totalSpoutLag"); - assertEquals(gATotal.getValue(), 40L); + assertEquals(40L, gATotal.getValue()); Gauge gBTotal = (Gauge) result.get("topicB/totalSpoutLag"); - assertEquals(gBTotal.getValue(), 150L); + assertEquals(150L, gBTotal.getValue()); //get the metrics a second time. Values should be the same. In particular, the total values for the topic should not accumulate. Each call to getMetrics should reset the total values. @@ -121,6 +123,39 @@ public void registerMetricsGetSpoutLag() throws ExecutionException, InterruptedE gBTotal = (Gauge) result.get("topicB/totalSpoutLag"); assertEquals(gBTotal.getValue(), 150L); + //get partition records + + ListOffsetsResult.ListOffsetsResultInfo tAp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tAp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(2, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(3, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo tBp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(4, System.currentTimeMillis(), Optional.empty()); + + Map topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>(); + topicPartitionEarliestListOffsetsResultInfoMap.put(tAp1, tAp1EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(tAp2, tAp2EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(tBp1, tBp1EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(tBp2, tBp2EarliestListOffsetsResultInfo); + + //mock consecutive calls. Each call to the recordsInPartition gauge will call kafkaFuture.get() twice + when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap, topicPartitionEarliestListOffsetsResultInfoMap, + topicPartitionLatestListOffsetsResultInfoMap, topicPartitionEarliestListOffsetsResultInfoMap, + topicPartitionLatestListOffsetsResultInfoMap, topicPartitionEarliestListOffsetsResultInfoMap, + topicPartitionLatestListOffsetsResultInfoMap, topicPartitionEarliestListOffsetsResultInfoMap); + + result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); + g1 = (Gauge) result.get("topicA/partition_1/recordsInPartition"); + g2 = (Gauge) result.get("topicA/partition_2/recordsInPartition"); + g3 = (Gauge) result.get("topicB/partition_1/recordsInPartition"); + g4 = (Gauge) result.get("topicB/partition_2/recordsInPartition"); + assertEquals(99L, g1.getValue()); + assertEquals(198L, g2.getValue()); + assertEquals(297L, g3.getValue()); + assertEquals(396L, g4.getValue()); + + gATotal = (Gauge) result.get("topicA/totalRecordsInPartitions"); + assertEquals(297L, gATotal.getValue()); + gBTotal = (Gauge) result.get("topicB/totalRecordsInPartitions"); + assertEquals(693L, gBTotal.getValue()); } @@ -145,11 +180,11 @@ public void registerMetricsGetEarliestAndLatest() throws ExecutionException, Int when(kafkaFuture.get()).thenReturn(topicPartitionEarliestListOffsetsResultInfoMap); - listOffsetsResultEarliest = mock(ListOffsetsResult.class); - when(listOffsetsResultEarliest.all()).thenReturn(kafkaFuture); + ListOffsetsResult listOffsetsResult = mock(ListOffsetsResult.class); + when(listOffsetsResult.all()).thenReturn(kafkaFuture); admin = mock(Admin.class); - when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResultEarliest); + when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult); OffsetManager offsetManagerTaP1 = mock(OffsetManager.class); when(offsetManagerTaP1.getLatestEmittedOffset()).thenReturn(50L); From 0b9e562ca47ae1afc19b966272a94a42f2e9ef1f Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Wed, 23 Apr 2025 23:07:34 +0100 Subject: [PATCH 11/17] Adding unit tests (cherry picked from commit 08fd1a91c51173673a212ccd2fcab50f378761da) --- .../metrics2/KafkaOffsetMetricManager.java | 2 +- .../KafkaOffsetPartitionAndTopicMetrics.java | 2 +- ...fkaOffsetPartitionAndTopicMetricsTest.java | 152 +++++++++--------- 3 files changed, 78 insertions(+), 78 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java index 9b6f662f541..ad82ce4ec8b 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java @@ -18,10 +18,10 @@ package org.apache.storm.kafka.spout.metrics2; -import java.util.function.Supplier; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.internal.OffsetManager; diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index 1d98624098b..da32549628b 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -3,12 +3,12 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricSet; -import java.util.function.Supplier; import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.OffsetSpec; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java index 02789b90595..79f0c348926 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java @@ -41,21 +41,21 @@ public void initializeTests() { @Test public void registerMetricsGetSpoutLagAndPartitionRecords() throws ExecutionException, InterruptedException { - TopicPartition tAp1 = new TopicPartition("topicA", 1); - TopicPartition tAp2 = new TopicPartition("topicA", 2); - TopicPartition tBp1 = new TopicPartition("topicB", 1); - TopicPartition tBp2 = new TopicPartition("topicB", 2); + TopicPartition topicAPartition1 = new TopicPartition("topicA", 1); + TopicPartition topicAPartition2 = new TopicPartition("topicA", 2); + TopicPartition topicBPartition1 = new TopicPartition("topicB", 1); + TopicPartition topicBPartition2 = new TopicPartition("topicB", 2); - ListOffsetsResult.ListOffsetsResultInfo tAp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tAp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(200, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicAPartition1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicAPartition2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(200, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicBPartition1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicBPartition2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(), Optional.empty()); Map topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>(); - topicPartitionLatestListOffsetsResultInfoMap.put(tAp1, tAp1LatestListOffsetsResultInfo); - topicPartitionLatestListOffsetsResultInfoMap.put(tAp2, tAp2LatestListOffsetsResultInfo); - topicPartitionLatestListOffsetsResultInfoMap.put(tBp1, tBp1LatestListOffsetsResultInfo); - topicPartitionLatestListOffsetsResultInfoMap.put(tBp2, tBp2LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition1, topicAPartition1LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition2, topicAPartition2LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition1, topicBPartition1LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition2, topicBPartition2LatestListOffsetsResultInfo); when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap); @@ -65,29 +65,29 @@ public void registerMetricsGetSpoutLagAndPartitionRecords() throws ExecutionExce admin = mock(Admin.class); when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult); - OffsetManager offsetManagerTaP1 = mock(OffsetManager.class); - when(offsetManagerTaP1.getCommittedOffset()).thenReturn(90L); + OffsetManager offsetManagertopicAPartition1 = mock(OffsetManager.class); + when(offsetManagertopicAPartition1.getCommittedOffset()).thenReturn(90L); - OffsetManager offsetManagerTaP2 = mock(OffsetManager.class); - when(offsetManagerTaP2.getCommittedOffset()).thenReturn(170L); + OffsetManager offsetManagertopicAPartition2 = mock(OffsetManager.class); + when(offsetManagertopicAPartition2.getCommittedOffset()).thenReturn(170L); - OffsetManager offsetManagerTbP1 = mock(OffsetManager.class); - when(offsetManagerTbP1.getCommittedOffset()).thenReturn(200L); + OffsetManager offsetManagertopicBPartition1 = mock(OffsetManager.class); + when(offsetManagertopicBPartition1.getCommittedOffset()).thenReturn(200L); - OffsetManager offsetManagerTbP2 = mock(OffsetManager.class); - when(offsetManagerTbP2.getCommittedOffset()).thenReturn(350L); + OffsetManager offsetManagertopicBPartition2 = mock(OffsetManager.class); + when(offsetManagertopicBPartition2.getCommittedOffset()).thenReturn(350L); offsetManagers = new HashMap<>(); - offsetManagers.put(tAp1, offsetManagerTaP1); - offsetManagers.put(tAp2, offsetManagerTaP2); - offsetManagers.put(tBp1, offsetManagerTbP1); - offsetManagers.put(tBp2, offsetManagerTbP2); + offsetManagers.put(topicAPartition1, offsetManagertopicAPartition1); + offsetManagers.put(topicAPartition2, offsetManagertopicAPartition2); + offsetManagers.put(topicBPartition1, offsetManagertopicBPartition1); + offsetManagers.put(topicBPartition2, offsetManagertopicBPartition2); assignment = new HashSet<>(); - assignment.add(tAp1); - assignment.add(tAp2); - assignment.add(tBp1); - assignment.add(tBp2); + assignment.add(topicAPartition1); + assignment.add(topicAPartition2); + assignment.add(topicBPartition1); + assignment.add(topicBPartition2); KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics = new KafkaOffsetPartitionAndTopicMetrics(() -> Collections.unmodifiableMap(offsetManagers), () -> admin, assignment); @@ -125,16 +125,16 @@ public void registerMetricsGetSpoutLagAndPartitionRecords() throws ExecutionExce //get partition records - ListOffsetsResult.ListOffsetsResultInfo tAp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tAp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(2, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(3, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(4, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicAPartition1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicAPartition2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(2, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicBPartition1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(3, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicBPartition2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(4, System.currentTimeMillis(), Optional.empty()); Map topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>(); - topicPartitionEarliestListOffsetsResultInfoMap.put(tAp1, tAp1EarliestListOffsetsResultInfo); - topicPartitionEarliestListOffsetsResultInfoMap.put(tAp2, tAp2EarliestListOffsetsResultInfo); - topicPartitionEarliestListOffsetsResultInfoMap.put(tBp1, tBp1EarliestListOffsetsResultInfo); - topicPartitionEarliestListOffsetsResultInfoMap.put(tBp2, tBp2EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition1, topicAPartition1EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition2, topicAPartition2EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition1, topicBPartition1EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition2, topicBPartition2EarliestListOffsetsResultInfo); //mock consecutive calls. Each call to the recordsInPartition gauge will call kafkaFuture.get() twice when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap, topicPartitionEarliestListOffsetsResultInfoMap, @@ -162,21 +162,21 @@ public void registerMetricsGetSpoutLagAndPartitionRecords() throws ExecutionExce @Test public void registerMetricsGetEarliestAndLatest() throws ExecutionException, InterruptedException { - TopicPartition tAp1 = new TopicPartition("topicA", 1); - TopicPartition tAp2 = new TopicPartition("topicA", 2); - TopicPartition tBp1 = new TopicPartition("topicB", 1); - TopicPartition tBp2 = new TopicPartition("topicB", 2); + TopicPartition topicAPartition1 = new TopicPartition("topicA", 1); + TopicPartition topicAPartition2 = new TopicPartition("topicA", 2); + TopicPartition topicBPartition1 = new TopicPartition("topicB", 1); + TopicPartition topicBPartition2 = new TopicPartition("topicB", 2); - ListOffsetsResult.ListOffsetsResultInfo tAp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tAp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicAPartition1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicAPartition2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicBPartition1EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicBPartition2EarliestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), Optional.empty()); Map topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>(); - topicPartitionEarliestListOffsetsResultInfoMap.put(tAp1, tAp1EarliestListOffsetsResultInfo); - topicPartitionEarliestListOffsetsResultInfoMap.put(tAp2, tAp2EarliestListOffsetsResultInfo); - topicPartitionEarliestListOffsetsResultInfoMap.put(tBp1, tBp1EarliestListOffsetsResultInfo); - topicPartitionEarliestListOffsetsResultInfoMap.put(tBp2, tBp2EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition1, topicAPartition1EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition2, topicAPartition2EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition1, topicBPartition1EarliestListOffsetsResultInfo); + topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition2, topicBPartition2EarliestListOffsetsResultInfo); when(kafkaFuture.get()).thenReturn(topicPartitionEarliestListOffsetsResultInfoMap); @@ -186,33 +186,33 @@ public void registerMetricsGetEarliestAndLatest() throws ExecutionException, Int admin = mock(Admin.class); when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult); - OffsetManager offsetManagerTaP1 = mock(OffsetManager.class); - when(offsetManagerTaP1.getLatestEmittedOffset()).thenReturn(50L); - when(offsetManagerTaP1.getCommittedOffset()).thenReturn(40L); + OffsetManager offsetManagertopicAPartition1 = mock(OffsetManager.class); + when(offsetManagertopicAPartition1.getLatestEmittedOffset()).thenReturn(50L); + when(offsetManagertopicAPartition1.getCommittedOffset()).thenReturn(40L); - OffsetManager offsetManagerTaP2 = mock(OffsetManager.class); - when(offsetManagerTaP2.getLatestEmittedOffset()).thenReturn(100L); - when(offsetManagerTaP2.getCommittedOffset()).thenReturn(90L); + OffsetManager offsetManagertopicAPartition2 = mock(OffsetManager.class); + when(offsetManagertopicAPartition2.getLatestEmittedOffset()).thenReturn(100L); + when(offsetManagertopicAPartition2.getCommittedOffset()).thenReturn(90L); - OffsetManager offsetManagerTbP1 = mock(OffsetManager.class); - when(offsetManagerTbP1.getLatestEmittedOffset()).thenReturn(150L); - when(offsetManagerTbP1.getCommittedOffset()).thenReturn(149L); + OffsetManager offsetManagertopicBPartition1 = mock(OffsetManager.class); + when(offsetManagertopicBPartition1.getLatestEmittedOffset()).thenReturn(150L); + when(offsetManagertopicBPartition1.getCommittedOffset()).thenReturn(149L); - OffsetManager offsetManagerTbP2 = mock(OffsetManager.class); - when(offsetManagerTbP2.getLatestEmittedOffset()).thenReturn(200L); - when(offsetManagerTbP2.getCommittedOffset()).thenReturn(200L); + OffsetManager offsetManagertopicBPartition2 = mock(OffsetManager.class); + when(offsetManagertopicBPartition2.getLatestEmittedOffset()).thenReturn(200L); + when(offsetManagertopicBPartition2.getCommittedOffset()).thenReturn(200L); offsetManagers = new HashMap<>(); - offsetManagers.put(tAp1, offsetManagerTaP1); - offsetManagers.put(tAp2, offsetManagerTaP2); - offsetManagers.put(tBp1, offsetManagerTbP1); - offsetManagers.put(tBp2, offsetManagerTbP2); + offsetManagers.put(topicAPartition1, offsetManagertopicAPartition1); + offsetManagers.put(topicAPartition2, offsetManagertopicAPartition2); + offsetManagers.put(topicBPartition1, offsetManagertopicBPartition1); + offsetManagers.put(topicBPartition2, offsetManagertopicBPartition2); assignment = new HashSet<>(); - assignment.add(tAp1); - assignment.add(tAp2); - assignment.add(tBp1); - assignment.add(tBp2); + assignment.add(topicAPartition1); + assignment.add(topicAPartition2); + assignment.add(topicBPartition1); + assignment.add(topicBPartition2); KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics = new KafkaOffsetPartitionAndTopicMetrics(() -> Collections.unmodifiableMap(offsetManagers), () -> admin, assignment); Map result = kafkaOffsetPartitionAndTopicMetrics.getMetrics(); @@ -250,16 +250,16 @@ public void registerMetricsGetEarliestAndLatest() throws ExecutionException, Int //get the latest offsets - ListOffsetsResult.ListOffsetsResultInfo tAp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tAp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(200, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(), Optional.empty()); - ListOffsetsResult.ListOffsetsResultInfo tBp2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicAPartition1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicAPartition2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(200, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicBPartition1LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(), Optional.empty()); + ListOffsetsResult.ListOffsetsResultInfo topicBPartition2LatestListOffsetsResultInfo = new ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(), Optional.empty()); Map topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>(); - topicPartitionLatestListOffsetsResultInfoMap.put(tAp1, tAp1LatestListOffsetsResultInfo); - topicPartitionLatestListOffsetsResultInfoMap.put(tAp2, tAp2LatestListOffsetsResultInfo); - topicPartitionLatestListOffsetsResultInfoMap.put(tBp1, tBp1LatestListOffsetsResultInfo); - topicPartitionLatestListOffsetsResultInfoMap.put(tBp2, tBp2LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition1, topicAPartition1LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition2, topicAPartition2LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition1, topicBPartition1LatestListOffsetsResultInfo); + topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition2, topicBPartition2LatestListOffsetsResultInfo); when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap); From 4ecb9c8185b7e35c0daf207b19e28f593e927a9e Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Wed, 23 Apr 2025 23:11:16 +0100 Subject: [PATCH 12/17] Adding unit tests (cherry picked from commit 2e2d6fb0d8e5b6e2ad3edfe9311419c11fa90821) --- .../spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index da32549628b..9bf56b44438 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -4,10 +4,10 @@ import com.codahale.metrics.Metric; import com.codahale.metrics.MetricSet; import java.util.Collections; -import java.util.concurrent.ExecutionException; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListOffsetsResult; From 05560f90976ae04a28ebf636faf1c8cd043598b4 Mon Sep 17 00:00:00 2001 From: reiabreu Date: Thu, 24 Apr 2025 11:46:45 +0100 Subject: [PATCH 13/17] Update KafkaOffsetPartitionAndTopicMetrics.java (cherry picked from commit 44f1bdc32c0ac67aa9d8ca5f149a1e6ab789d187) --- .../KafkaOffsetPartitionAndTopicMetrics.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java index 9bf56b44438..6f6d766b491 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionAndTopicMetrics.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.storm.kafka.spout.metrics2; import com.codahale.metrics.Gauge; From 35c155e7cbbd86a252e6282560b2d911890ac67c Mon Sep 17 00:00:00 2001 From: reiabreu Date: Thu, 24 Apr 2025 11:47:31 +0100 Subject: [PATCH 14/17] Update KafkaOffsetPartitionAndTopicMetricsTest.java (cherry picked from commit be536d7fdfa5db6af524db392c4adbb856766fe4) --- ...afkaOffsetPartitionAndTopicMetricsTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java index 79f0c348926..cc774e8b24d 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionAndTopicMetricsTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.storm.kafka.spout.metric2; From 366cc8a66038f74c396b588cba9afb14d117c746 Mon Sep 17 00:00:00 2001 From: reiabreu Date: Thu, 24 Apr 2025 12:41:18 +0100 Subject: [PATCH 15/17] Delete external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java (cherry picked from commit 08f02575ed7d7498cfbbbd52eccda29f6d0c6d90) --- .../metrics2/KafkaOffsetPartitionMetrics.java | 227 ------------------ 1 file changed, 227 deletions(-) delete mode 100644 external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java deleted file mode 100644 index 0bb0d2c2168..00000000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.metrics2; - -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricSet; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.function.Supplier; - -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.OffsetSpec; -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RetriableException; -import org.apache.storm.kafka.spout.internal.OffsetManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Partition level metrics. - *

- * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition - * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition - * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout - * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout - * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset - * topicName/partition_{number}/recordsInPartition // total number of records in the partition - *

- */ -public class KafkaOffsetPartitionMetrics implements MetricSet { - private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetPartitionMetrics.class); - private final Supplier> offsetManagerSupplier; - private final Supplier adminSupplier; - - private TopicPartition topicPartition; - private KafkaOffsetTopicMetrics topicMetrics; - - public KafkaOffsetPartitionMetrics(Supplier> offsetManagerSupplier, - Supplier adminSupplier, - TopicPartition topicPartition, - KafkaOffsetTopicMetrics topicMetrics) { - this.offsetManagerSupplier = offsetManagerSupplier; - this.adminSupplier = adminSupplier; - this.topicPartition = topicPartition; - this.topicMetrics = topicMetrics; - - LOG.info("Running KafkaOffsetMetricSet"); - } - - @Override - public Map getMetrics() { - Map metrics = new HashMap(); - - String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); - Gauge spoutLagGauge = new Gauge() { - @Override - public Long getValue() { - Map endOffsets = getEndOffsets(Collections.singleton(topicPartition)); - if (endOffsets == null || endOffsets.isEmpty()) { - LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); - return 0L; - } - // add value to topic level metric - OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); - Long ret = endOffsets.get(topicPartition) - offsetManager.getCommittedOffset(); - topicMetrics.totalSpoutLag += ret; - return ret; - } - }; - - Gauge earliestTimeOffsetGauge = new Gauge() { - @Override - public Long getValue() { - Map beginningOffsets = getBeginningOffsets(Collections.singleton(topicPartition)); - if (beginningOffsets == null || beginningOffsets.isEmpty()) { - LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", topicPartition); - return 0L; - } - // add value to topic level metric - Long ret = beginningOffsets.get(topicPartition); - topicMetrics.totalEarliestTimeOffset += ret; - return ret; - } - }; - - Gauge latestTimeOffsetGauge = new Gauge() { - @Override - public Long getValue() { - Map endOffsets = getEndOffsets(Collections.singleton(topicPartition)); - if (endOffsets == null || endOffsets.isEmpty()) { - LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); - return 0L; - } - // add value to topic level metric - Long ret = endOffsets.get(topicPartition); - topicMetrics.totalLatestTimeOffset += ret; - return ret; - } - }; - - Gauge latestEmittedOffsetGauge = new Gauge() { - @Override - public Long getValue() { - // add value to topic level metric - OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); - Long ret = offsetManager.getLatestEmittedOffset(); - topicMetrics.totalLatestEmittedOffset += ret; - return ret; - } - }; - - Gauge latestCompletedOffsetGauge = new Gauge() { - @Override - public Long getValue() { - // add value to topic level metric - OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); - Long ret = offsetManager.getCommittedOffset(); - topicMetrics.totalLatestCompletedOffset += ret; - return ret; - } - }; - - Gauge recordsInPartitionGauge = new Gauge() { - @Override - public Long getValue() { - Map endOffsets = getEndOffsets(Collections.singleton(topicPartition)); - if (endOffsets == null || endOffsets.isEmpty()) { - LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); - return 0L; - } - Map beginningOffsets = getBeginningOffsets(Collections.singleton(topicPartition)); - if (beginningOffsets == null || beginningOffsets.isEmpty()) { - LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", topicPartition); - return 0L; - } - // add value to topic level metric - Long ret = endOffsets.get(topicPartition) - beginningOffsets.get(topicPartition); - topicMetrics.totalRecordsInPartitions += ret; - return ret; - } - }; - - metrics.put(metricPath + "/" + "spoutLag", spoutLagGauge); - metrics.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffsetGauge); - metrics.put(metricPath + "/" + "latestTimeOffset", latestTimeOffsetGauge); - metrics.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffsetGauge); - metrics.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffsetGauge); - metrics.put(metricPath + "/" + "recordsInPartition", recordsInPartitionGauge); - - return metrics; - } - - private Map getBeginningOffsets(Set topicPartitions) { - Admin admin = adminSupplier.get(); - if (admin == null) { - LOG.error("Kafka admin object is null, returning 0."); - return Collections.EMPTY_MAP; - } - - Map beginningOffsets; - try { - beginningOffsets = getOffsets(admin, topicPartitions, OffsetSpec.earliest()); - } catch (RetriableException | ExecutionException | InterruptedException e) { - LOG.error("Failed to get offset from Kafka for topic partitions: {}.", topicPartition, e); - return Collections.EMPTY_MAP; - } - return beginningOffsets; - } - - private Map getEndOffsets(Set topicPartitions) { - Admin admin = adminSupplier.get(); - if (admin == null) { - LOG.error("Kafka admin object is null, returning 0."); - return Collections.EMPTY_MAP; - } - - Map endOffsets; - try { - endOffsets = getOffsets(admin, topicPartitions, OffsetSpec.latest()); - } catch (RetriableException | ExecutionException | InterruptedException e) { - LOG.error("Failed to get offset from Kafka for topic partitions: {}.", topicPartition, e); - return Collections.EMPTY_MAP; - } - return endOffsets; - } - - private static Map getOffsets(Admin admin, Set topicPartitions, OffsetSpec offsetSpec) - throws InterruptedException, ExecutionException { - - Map offsetSpecMap = new HashMap<>(); - for (TopicPartition topicPartition : topicPartitions) { - offsetSpecMap.put(topicPartition, offsetSpec); - } - Map ret = new HashMap<>(); - ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap); - KafkaFuture> all = listOffsetsResult.all(); - Map topicPartitionListOffsetsResultInfoMap = all.get(); - for (Map.Entry entry : - topicPartitionListOffsetsResultInfoMap.entrySet()) { - ret.put(entry.getKey(), entry.getValue().offset()); - } - return ret; - } -} From ab1d85ca41ac560eb75c4757a8404f99204792c6 Mon Sep 17 00:00:00 2001 From: reiabreu Date: Thu, 24 Apr 2025 13:11:32 +0100 Subject: [PATCH 16/17] Update KafkaOffsetMetricManager.java (cherry picked from commit ce3acec2ec307cce137661d38a75f8e7c00da857) --- .../spout/metrics2/KafkaOffsetMetricManager.java | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java index ad82ce4ec8b..dd96f6d1887 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java @@ -38,9 +38,8 @@ public class KafkaOffsetMetricManager { private final Supplier> offsetManagerSupplier; private final Supplier adminSupplier; private TopologyContext topologyContext; + private KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics; - private Map topicMetricsMap; - private Map topicPartitionMetricsMap; public KafkaOffsetMetricManager(Supplier> offsetManagerSupplier, Supplier adminSupplier, @@ -48,9 +47,7 @@ public KafkaOffsetMetricManager(Supplier> off this.offsetManagerSupplier = offsetManagerSupplier; this.adminSupplier = adminSupplier; this.topologyContext = topologyContext; - - this.topicMetricsMap = new HashMap<>(); - this.topicPartitionMetricsMap = new HashMap<>(); + LOG.info("Running KafkaOffsetMetricManager"); } @@ -58,14 +55,9 @@ public void registerPartitionAndTopicLevelMetrics(Set newAssignm KafkaOffsetPartitionAndTopicMetrics topicPartitionMetricSet = new KafkaOffsetPartitionAndTopicMetrics(offsetManagerSupplier, adminSupplier, newAssignment); + + this.kafkaOffsetPartitionAndTopicMetrics = topicPartitionMetricSet; topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet); } - public Map getTopicPartitionMetricsMap() { - return topicPartitionMetricsMap; - } - - public Map getTopicMetricsMap() { - return topicMetricsMap; - } } From e448b53553af24b57a8aa1c74479dbeb2344c6ba Mon Sep 17 00:00:00 2001 From: Rui Abreu Date: Fri, 25 Apr 2025 08:30:30 +0100 Subject: [PATCH 17/17] Adding unit tests (cherry picked from commit 781baa10586c4da09e48e4ecb953892efcb6c3b9) --- .../metrics2/KafkaOffsetMetricManager.java | 5 ++++ .../metrics2/KafkaOffsetTopicMetrics.java | 23 +--------------- .../spout/KafkaSpoutReactivationTest.java | 26 +++++++++++++++++++ 3 files changed, 32 insertions(+), 22 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java index dd96f6d1887..e871277084a 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java @@ -38,6 +38,7 @@ public class KafkaOffsetMetricManager { private final Supplier> offsetManagerSupplier; private final Supplier adminSupplier; private TopologyContext topologyContext; + private KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics; @@ -60,4 +61,8 @@ public void registerPartitionAndTopicLevelMetrics(Set newAssignm topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet); } + public KafkaOffsetPartitionAndTopicMetrics getKafkaOffsetPartitionAndTopicMetrics() { + return kafkaOffsetPartitionAndTopicMetrics; + } + } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java index cd0fbfc6b2c..900bb86ea63 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java @@ -41,7 +41,7 @@ public class KafkaOffsetTopicMetrics implements MetricSet { private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetTopicMetrics.class); - private String topic; + private final String topic; long totalSpoutLag; long totalEarliestTimeOffset; long totalLatestTimeOffset; @@ -115,25 +115,4 @@ public Long getValue() { metrics.put(topic + "/" + "totalRecordsInPartitions", totalRecordsInPartitionsGauge); return metrics; } - - private class TopicMetrics { - long totalSpoutLag = 0L; - long totalEarliestTimeOffset = 0L; - long totalLatestTimeOffset = 0L; - long totalLatestEmittedOffset = 0L; - long totalLatestCompletedOffset = 0L; - long totalRecordsInPartitions = 0L; - - public void incrementTotalSpoutLag(long offset) { - totalSpoutLag += offset; - } - - public void incrementTotalEarliestTimeOffset(long offset) { - totalEarliestTimeOffset += offset; - } - - public void incrementTotalLatestTimeOffset(long offset) { - totalLatestTimeOffset += offset; - } - } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java index b4cab2ace70..4e5702a39d9 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java @@ -29,9 +29,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; import java.util.HashMap; import java.util.Map; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -65,6 +68,7 @@ public class KafkaSpoutReactivationTest { private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); private final long commitOffsetPeriodMs = 2_000; private Consumer consumerSpy; + private Admin adminSpy; private KafkaSpout spout; private final int maxPollRecords = 10; @@ -78,9 +82,12 @@ public void prepareSpout(int messageCount, FirstPollOffsetStrategy firstPollOffs .build(); ClientFactory clientFactory = new ClientFactoryDefault<>(); this.consumerSpy = spy(clientFactory.createConsumer(spoutConfig.getKafkaProps())); + this.adminSpy = spy(clientFactory.createAdmin(spoutConfig.getKafkaProps())); ClientFactory clientFactoryMock = mock(ClientFactory.class); when(clientFactoryMock.createConsumer(any())) .thenReturn(consumerSpy); + when(clientFactoryMock.createAdmin(any())) + .thenReturn(adminSpy); this.spout = new KafkaSpout<>(spoutConfig, clientFactoryMock, new TopicAssigner()); SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitExtension.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount); SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); @@ -147,4 +154,23 @@ public void testSpoutShouldResumeWhereItLeftOffWithEarliestStrategy() throws Exc //With earliest, the spout should also resume where it left off, rather than restart at the earliest offset. doReactivationTest(FirstPollOffsetStrategy.EARLIEST); } + + @Test + public void testSpoutMustHandleGettingMetricsWhileDeactivated() throws Exception { + //Storm will try to get metrics from the spout even while deactivated, the spout must be able to handle this + prepareSpout(10, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST); + + for (int i = 0; i < 5; i++) { + KafkaSpoutMessageId msgId = emitOne(); + spout.ack(msgId); + } + + spout.deactivate(); + + Map offsetMetric = spout.getKafkaOffsetMetricManager().getKafkaOffsetPartitionAndTopicMetrics().getMetrics(); + Long partitionLag = (Long) ((Gauge) offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + "/partition_0/spoutLag")).getValue(); + Long spoutLag = (Long) ((Gauge) offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + "/totalSpoutLag")).getValue(); + assertThat(partitionLag, is(5L)); + assertThat(spoutLag, is(5L)); + } }