The Amazon SNS Java Messaging Library provides an asynchronous, batched messaging client for Amazon SNS, supporting both AWS SDK v1 (AmazonSNS) and v2 (SnsClient). It features configurable batching with linger time, FIFO ordering, message attributes, and Micrometer metrics.
For detailed architecture, threading model, batching behavior, and exception handling, see the Technical Guide.
The batch size should be chosen based on the size of individual messages and available network bandwidth as well as the observed latency and throughput improvements based on the real life load. These are configured to some sensible defaults assuming smaller message sizes and the optimal batch size for server side processing.
Combine multiple requests to optimally utilise the network.
Article Martin Fowler Request Batch
Compatible JDK 8, 11, 17, 21 and 25
Compatible AWS JDK v1 >= 1.12
Compatible AWS JDK v2 >= 2.18
This library supports Kotlin aswell
In order to use Amazon SNS Java Messaging Lib within a Maven project, simply add the following dependency to your pom.xml. There are no other dependencies for Amazon SNS Java Messaging Lib, which means other unwanted libraries will not overwhelm your project.
You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v1</artifactId>
<version>1.3.0</version>
</dependency><dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v2</artifactId>
<version>1.3.0</version>
</dependency>If you want to try a snapshot version, add the following repository:
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshots</name>
<url>https://central.sonatype.com/repository/maven-snapshots</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.3.0'implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.3.0'If you want to try a snapshot version, add the following repository:
repositories {
maven {
url "https://central.sonatype.com/repository/maven-snapshots"
}
}| Property | Type | Description |
|---|---|---|
fifo |
boolean | refers if SNS is fifo or not. |
maximumPoolSize |
int | refers maximum threads for producer. |
topicArn |
string | refers topic arn name. |
linger |
int | refers to the time to wait before sending messages out to SNS. |
maxBatchSize |
int | refers to the maximum amount of data to be collected before sending the batch. |
NOTICE: the buffer of message store in memory is calculate using maximumPoolSize * maxBatchSize huge values demand huge memory.
final TopicProperty topicProperty = TopicProperty.builder()
.fifo(false)
.linger(100)
.maxBatchSize(10)
.maximumPoolSize(20)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty)
.topicRequests(new LinkedBlockingQueue<>(100))
.build();final TopicProperty topicProperty = TopicProperty.builder()
.fifo(false)
.linger(100)
.maxBatchSize(10)
.maximumPoolSize(20)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty)
.objectMapper(new ObjectMapper())
.build();final TopicProperty topicProperty = TopicProperty.builder()
.fifo(false)
.linger(100)
.maxBatchSize(10)
.maximumPoolSize(20)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty)
.topicRequests(new LinkedBlockingQueue<>(100))
.objectMapper(new ObjectMapper())
.build();final TopicProperty topicProperty = TopicProperty.builder()
.fifo(false)
.linger(100)
.maxBatchSize(10)
.maximumPoolSize(20)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty)
.meterRegistry(new SimpleMeterRegistry())
.build();final TopicProperty topicProperty = TopicProperty.builder()
.fifo(false)
.linger(100)
.maxBatchSize(10)
.maximumPoolSize(20)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build();
final RequestEntry<MyMessage> requestEntry = RequestEntry.builder()
.withValue(new MyMessage())
.withMessageHeaders(Map.of())
.build();
snsTemplate.send(requestEntry);final TopicProperty topicProperty = TopicProperty.builder()
.fifo(true)
.linger(100)
.maxBatchSize(10)
.maximumPoolSize(20)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build();
final RequestEntry<MyMessage> requestEntry = RequestEntry.builder()
.withValue(new MyMessage())
.withMessageHeaders(Map.of())
.withGroupId(UUID.randomUUID().toString())
.withDeduplicationId(UUID.randomUUID().toString())
.build();
snsTemplate.send(requestEntry);final TopicProperty topicProperty = TopicProperty.builder()
.fifo(true)
.linger(100)
.maxBatchSize(10)
.maximumPoolSize(20)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build();
final RequestEntry<MyMessage> requestEntry = RequestEntry.builder()
.withValue(new MyMessage())
.withMessageHeaders(Map.of())
.withGroupId(UUID.randomUUID().toString())
.withDeduplicationId(UUID.randomUUID().toString())
.build();
snsTemplate.send(requestEntry).addCallback(
success -> LOGGER.info("Sent: {}", success.getMessageId()),
failure -> LOGGER.error("Failed: {} [{}]", failure.getMessage(), failure.getCode())
);
snsTemplate.send(requestEntry).addCallback(
success -> LOGGER.info("Sent: {}", success.getMessageId())
);final TopicProperty topicProperty = TopicProperty.builder()
.fifo(true)
.linger(100)
.maxBatchSize(10)
.maximumPoolSize(20)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build();
final RequestEntry<MyMessage> requestEntry = RequestEntry.builder()
.withValue(new MyMessage())
.withMessageHeaders(Map.of())
.withGroupId(UUID.randomUUID().toString())
.withDeduplicationId(UUID.randomUUID().toString())
.build();
snsTemplate.send(requestEntry).addCallback(
success -> LOGGER.info("Sent: {}", success.getMessageId()),
failure -> LOGGER.error("Failed: {} [{}]", failure.getMessage(), failure.getCode())
);
snsTemplate.await().join();final TopicProperty topicProperty = TopicProperty.builder()
.fifo(true)
.linger(100)
.maxBatchSize(10)
.maximumPoolSize(20)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build();
final RequestEntry<MyMessage> requestEntry = RequestEntry.builder()
.withValue(new MyMessage())
.withMessageHeaders(Map.of())
.withGroupId(UUID.randomUUID().toString())
.withDeduplicationId(UUID.randomUUID().toString())
.build();
snsTemplate.send(requestEntry).addCallback(
success -> LOGGER.info("Sent: {}", success.getMessageId()),
failure -> LOGGER.error("Failed: {} [{}]", failure.getMessage(), failure.getCode())
);
snsTemplate.shutdown();TopicProperty topicProperty = TopicProperty.builder()
.fifo(false)
.linger(100L)
.maxBatchSize(10)
.maximumPoolSize(5)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> template = AmazonSnsTemplate.builder(snsClient, topicProperty)
.meterRegistry(new SimpleMeterRegistry())
.topicRequests(new RingBufferBlockingQueue<>(1024))
.objectMapper(new ObjectMapper())
.build();
template.send(RequestEntry.<MyMessage>builder()
.withValue(new MyMessage("hello"))
.withMessageHeaders(Map.of("source", "app-1"))
.withGroupId(UUID.randomUUID().toString())
.build());
template.await().thenRun(template::shutdown).join();When a MeterRegistry is provided via the builder, the library records these Micrometer metrics:
Tags: topic = <topicArn>
| Metric | Type | Description |
|---|---|---|
sns.publish.attempts |
Counter | Total PublishBatch attempts |
sns.publish.success |
Counter | Successful messages |
sns.publish.failure |
Counter | Failed messages (dynamic tags: error_code, error_type) |
sns.publish.duration |
Timer | Publish latency (p50/p95/p99) |
sns.publish.batch.size |
DistributionSummary | Messages per batch |
sns.publish.inflight |
Gauge | In-flight publish batches |
Tags: name = <queueName>
| Metric | Type | Description |
|---|---|---|
blocking.queue.puts.total |
Counter | Successful put operations |
blocking.queue.puts.failed |
Counter | Put operations that threw an exception |
blocking.queue.put.duration |
Timer | Put latency (percentile histogram) |
blocking.queue.takes.total |
Counter | Successful take operations |
blocking.queue.takes.failed |
Counter | Take operations that threw an exception |
blocking.queue.take.duration |
Timer | Take latency (percentile histogram) |
blocking.queue.size |
Gauge | Current queue depth |
Tags: name = <executorName>
| Metric | Type | Description |
|---|---|---|
executor.active |
Gauge | Tasks currently executing |
executor.tasks.succeeded |
Counter | Tasks completed without exception |
executor.tasks.failed |
Counter | Tasks completed with exception |
executor.task.duration |
Timer | Task wall-clock duration |
Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.
We use GitHub for versioning. For the versions available, see the tags on this repository.
- Marcos Vallim - Founder, Author, Development, Test, Documentation - mvallim
See also the list of contributors who participated in this project.
This project is licensed under the Apache License - see the LICENSE file for details