这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions spring-kafka-3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.13</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -50,12 +51,23 @@
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson-datatype.version}</version>
</dependency>
</dependencies>

<properties>
<java.version>17</java.version>
<kafka-version>3.0.12</kafka-version>
<testcontainers.version>1.19.3</testcontainers.version>
<awaitility.version>4.2.0</awaitility.version>
<jackson-datatype.version>2.13.5</jackson-datatype.version>
</properties>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.baeldung.spring.kafka.delay;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.KafkaBackoffException;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;

public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>> implements AcknowledgingConsumerAwareMessageListener<K, V> {

private static final Duration DEFAULT_DELAY_VALUE = Duration.of(0, ChronoUnit.SECONDS);

private final String listenerId;

private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager;

private final Map<String, Duration> delaysPerTopic = new ConcurrentHashMap<>();

private Duration defaultDelay = DEFAULT_DELAY_VALUE;

public DelayedMessageListenerAdapter(MessageListener<K, V> delegate, KafkaConsumerBackoffManager kafkaConsumerBackoffManager, String listenerId) {
super(delegate);
Objects.requireNonNull(kafkaConsumerBackoffManager, "kafkaConsumerBackoffManager cannot be null");
Objects.requireNonNull(listenerId, "listenerId cannot be null");
this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager;
this.listenerId = listenerId;
}

@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws KafkaBackoffException {
this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord, consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay)
.toMillis(), consumer));
invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
}

public void setDelayForTopic(String topic, Duration delay) {
Objects.requireNonNull(topic, "Topic cannot be null");
Objects.requireNonNull(delay, "Delay cannot be null");
this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
this.delaysPerTopic.put(topic, delay);
}

public void setDefaultDelay(Duration delay) {
Objects.requireNonNull(delay, "Delay cannot be null");
this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
this.defaultDelay = delay;
}

private void invokeDelegateOnMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
switch (this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
break;
case ACKNOWLEDGING:
this.delegate.onMessage(consumerRecord, acknowledgment);
break;
case CONSUMER_AWARE:
this.delegate.onMessage(consumerRecord, consumer);
break;
case SIMPLE:
this.delegate.onMessage(consumerRecord);
}
}

private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp, Consumer<?, ?> consumer) {
return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, this.listenerId, new TopicPartition(data.topic(), data.partition()), consumer);
}

@Override
public void onMessage(ConsumerRecord<K, V> data) {
onMessage(data, null, null);
}

@Override
public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) {
onMessage(data, acknowledgment, null);
}

@Override
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
onMessage(data, null, consumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.baeldung.spring.kafka.delay;

import java.time.Duration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManager;
import org.springframework.kafka.listener.ContainerPausingBackOffHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.ListenerContainerPauseService;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

@Configuration
public class KafkaConsumerConfig {

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory, ListenerContainerRegistry registry, TaskScheduler scheduler) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.RECORD);
factory.setContainerCustomizer(container -> {
DelayedMessageListenerAdapter<Object, Object> delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container);
delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10));
delayedAdapter.setDefaultDelay(Duration.ZERO);
container.setupMessageListener(delayedAdapter);
});
return factory;
}

@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper().registerModule(new JavaTimeModule())
.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
}

@Bean
public TaskScheduler taskScheduler() {
return new ThreadPoolTaskScheduler();
}

@SuppressWarnings("unchecked")
private DelayedMessageListenerAdapter<Object, Object> wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager, ConcurrentMessageListenerContainer<Object, Object> container) {
return new DelayedMessageListenerAdapter<>((MessageListener<Object, Object>) container.getContainerProperties()
.getMessageListener(), backOffManager, container.getListenerId());
}

private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) {
return new ContainerPartitionPausingBackOffManager(registry, new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler)));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.baeldung.spring.kafka.delay;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka
@SpringBootApplication
public class KafkaDelayApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDelayApplication.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.baeldung.spring.kafka.delay;

import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {

private UUID orderId;

private LocalDateTime orderGeneratedDateTime;

private LocalDateTime orderProcessedTime;

private List<String> address;

private double price;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.baeldung.spring.kafka.delay;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.listener.KafkaBackoffException;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import groovy.util.logging.Slf4j;

@Slf4j
@Component
public class OrderListener {

private final OrderService orderService;

private final ObjectMapper objectMapper;

public OrderListener(OrderService orderService, ObjectMapper objectMapper) {
this.orderService = orderService;
this.objectMapper = objectMapper;
}

@RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders")
public void handleOrders(String order) throws JsonProcessingException {
Order orderDetails = objectMapper.readValue(order, Order.class);
OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId());
if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) {
orderService.processOrder(orderDetails);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.baeldung.spring.kafka.delay;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.springframework.stereotype.Service;

@Service
public class OrderService {

HashMap<UUID, Order> orders = new HashMap<>();

public Status findStatusById(UUID orderId) {
return Status.ORDER_CONFIRMED;
}

public void processOrder(Order order) {
order.setOrderProcessedTime(LocalDateTime.now());
orders.put(order.getOrderId(), order);
}

public Map<UUID, Order> getOrders() {
return orders;
}

enum Status {
CREATED, ORDER_CONFIRMED, ORDER_PROCESSED, DELETED
}
}
Loading