diff --git a/spring-kafka-3/pom.xml b/spring-kafka-3/pom.xml
index 894eab25765f..2d6def9c2cd2 100644
--- a/spring-kafka-3/pom.xml
+++ b/spring-kafka-3/pom.xml
@@ -22,6 +22,7 @@
org.springframework.kafka
spring-kafka
+ 2.9.13
com.fasterxml.jackson.core
@@ -50,6 +51,16 @@
${awaitility.version}
test
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ ${jackson-datatype.version}
+
@@ -57,5 +68,6 @@
3.0.12
1.19.3
4.2.0
+ 2.13.5
diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java
new file mode 100644
index 000000000000..339f18677a81
--- /dev/null
+++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java
@@ -0,0 +1,93 @@
+package com.baeldung.spring.kafka.delay;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
+import org.springframework.kafka.listener.KafkaBackoffException;
+import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
+import org.springframework.kafka.listener.MessageListener;
+import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
+import org.springframework.kafka.support.Acknowledgment;
+
+public class DelayedMessageListenerAdapter extends AbstractDelegatingMessageListenerAdapter> implements AcknowledgingConsumerAwareMessageListener {
+
+ private static final Duration DEFAULT_DELAY_VALUE = Duration.of(0, ChronoUnit.SECONDS);
+
+ private final String listenerId;
+
+ private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager;
+
+ private final Map delaysPerTopic = new ConcurrentHashMap<>();
+
+ private Duration defaultDelay = DEFAULT_DELAY_VALUE;
+
+ public DelayedMessageListenerAdapter(MessageListener delegate, KafkaConsumerBackoffManager kafkaConsumerBackoffManager, String listenerId) {
+ super(delegate);
+ Objects.requireNonNull(kafkaConsumerBackoffManager, "kafkaConsumerBackoffManager cannot be null");
+ Objects.requireNonNull(listenerId, "listenerId cannot be null");
+ this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager;
+ this.listenerId = listenerId;
+ }
+
+ @Override
+ public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment, Consumer, ?> consumer) throws KafkaBackoffException {
+ this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord, consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay)
+ .toMillis(), consumer));
+ invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
+ }
+
+ public void setDelayForTopic(String topic, Duration delay) {
+ Objects.requireNonNull(topic, "Topic cannot be null");
+ Objects.requireNonNull(delay, "Delay cannot be null");
+ this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
+ this.delaysPerTopic.put(topic, delay);
+ }
+
+ public void setDefaultDelay(Duration delay) {
+ Objects.requireNonNull(delay, "Delay cannot be null");
+ this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
+ this.defaultDelay = delay;
+ }
+
+ private void invokeDelegateOnMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment, Consumer, ?> consumer) {
+ switch (this.delegateType) {
+ case ACKNOWLEDGING_CONSUMER_AWARE:
+ this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
+ break;
+ case ACKNOWLEDGING:
+ this.delegate.onMessage(consumerRecord, acknowledgment);
+ break;
+ case CONSUMER_AWARE:
+ this.delegate.onMessage(consumerRecord, consumer);
+ break;
+ case SIMPLE:
+ this.delegate.onMessage(consumerRecord);
+ }
+ }
+
+ private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord data, long nextExecutionTimestamp, Consumer, ?> consumer) {
+ return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, this.listenerId, new TopicPartition(data.topic(), data.partition()), consumer);
+ }
+
+ @Override
+ public void onMessage(ConsumerRecord data) {
+ onMessage(data, null, null);
+ }
+
+ @Override
+ public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
+ onMessage(data, acknowledgment, null);
+ }
+
+ @Override
+ public void onMessage(ConsumerRecord data, Consumer, ?> consumer) {
+ onMessage(data, null, consumer);
+ }
+}
\ No newline at end of file
diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java
new file mode 100644
index 000000000000..f84b7d3f2ad5
--- /dev/null
+++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java
@@ -0,0 +1,64 @@
+package com.baeldung.spring.kafka.delay;
+
+import java.time.Duration;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManager;
+import org.springframework.kafka.listener.ContainerPausingBackOffHandler;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
+import org.springframework.kafka.listener.ListenerContainerPauseService;
+import org.springframework.kafka.listener.ListenerContainerRegistry;
+import org.springframework.kafka.listener.MessageListener;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+@Configuration
+public class KafkaConsumerConfig {
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory