这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.baeldung.partitioningstrategy;
package com.baeldung.spring.kafka.partitioningstrategy;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {
private static final int PREMIUM_PARTITION = 0;
private static final int NORMAL_PARTITION = 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package com.baeldung.partitioningstrategy;
package com.baeldung.spring.kafka.partitioningstrategy;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -12,9 +15,6 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@SpringBootApplication
public class KafkaApplication {

Expand All @@ -26,7 +26,7 @@ public KafkaTemplate<String, String> kafkaTemplate() {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9095");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
Expand All @@ -35,7 +35,7 @@ public ProducerFactory<String, String> producerFactory() {
@Bean
public KafkaConsumer<String, String> kafkaConsumer() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9095");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // Set a unique group ID
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package com.baeldung.partitioningstrategy;
package com.baeldung.spring.kafka.partitioningstrategy;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.annotation.Nullable;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.annotation.Nullable;

@Service
public class KafkaMessageConsumer {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.baeldung.partitioningstrategy;
package com.baeldung.spring.kafka.partitioningstrategy;

public class ReceivedMessage {
private final String key;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.baeldung.sasl;
package com.baeldung.spring.kafka.sasl;

import java.util.ArrayList;
import java.util.List;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.baeldung.sasl;
package com.baeldung.spring.kafka.sasl;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand Down
2 changes: 1 addition & 1 deletion spring-kafka-2/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094, localhost:9095
message.topic.name=baeldung
long.message.topic.name=longMessage
greeting.topic.name=greeting
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
package com.baeldung.partitioningstrategy;
package com.baeldung.spring.kafka.partitioningstrategy;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -16,19 +27,8 @@
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;

@SpringBootTest
@EmbeddedKafka(partitions = 3, brokerProperties = { "listeners=PLAINTEXT://localhost:9092" }, kraft = false)
@EmbeddedKafka(partitions = 3, brokerProperties = { "listeners=PLAINTEXT://localhost:9095" }, kraft = false)
public class KafkaApplicationIntegrationTest {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.baeldung.sasl;
package com.baeldung.spring.kafka.sasl;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -7,7 +7,7 @@

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = KafkaSaslApplication.class)
class SpringContextTest {
class SprintContextTest {

@Test
void whenSpringContextIsBootstrapped_thenNoExceptions() {
Expand Down