这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apache-kafka-3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## Apache Kafka

This module contains articles about Apache Kafka.

##### Building the project
You can build the project from the command line using: *mvn clean install*, or in an IDE.

### Relevant Articles:

48 changes: 48 additions & 0 deletions apache-kafka-3/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>apache-kafka-3</artifactId>
<name>apache-kafka-3</name>

<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.databind.version}</version>
</dependency>
</dependencies>

<properties>
<kafka.version>3.6.1</kafka.version>
<jackson.databind.version>2.15.2</jackson.databind.version>
</properties>
<profiles>
<profile>
<id>integration-jdk9-and-above</id>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.baeldung.kafka.commitoffset;

import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties;

import java.time.Duration;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class AsyncCommit {

public static void main(String[] args) {

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties());
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {
// processed message
consumer.commitAsync();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.baeldung.kafka.commitoffset;

import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties;

import java.time.Duration;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class AutomaticCommit {

public static void main(String[] args) {

Properties properties = KafkaConfigProperties.getProperties();
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {
// processed message
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.baeldung.kafka.commitoffset;

import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class SpecificOffsetCommit {
public static void main(String[] args) {

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties());
consumer.subscribe(KafkaConfigProperties.getTopic());
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int messageProcessed = 0;
while (true) {
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {
// processed message
messageProcessed++;
currentOffsets.put(new TopicPartition(message.topic(), message.partition()), new OffsetAndMetadata(message.offset() + 1));
if (messageProcessed % 50 == 0) {
consumer.commitSync(currentOffsets);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.baeldung.kafka.commitoffset;

import com.baeldung.kafka.commitoffset.config.KafkaConfigProperties;

import java.time.Duration;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class SyncCommit {

public static void main(String[] args) {

KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(KafkaConfigProperties.getProperties());
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {
// processed message
consumer.commitSync();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.baeldung.kafka.commitoffset.config;

import java.util.ArrayList;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

/**
* @author amitkumar
*/
public class KafkaConfigProperties {
public static final String MY_TOPIC = "my-topic";

public static Properties getProperties() {

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return props;
}

public static ArrayList<String> getTopic() {
ArrayList<String> topics = new ArrayList<>();
topics.add(MY_TOPIC);
return topics;
}
}
11 changes: 11 additions & 0 deletions apache-kafka-3/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@
<module>apache-httpclient4</module>
<module>apache-httpclient</module>
<module>apache-kafka-2</module>
<module>apache-kafka-3</module>
<module>apache-kafka</module>
<module>apache-libraries-2</module>
<module>apache-libraries</module>
Expand Down Expand Up @@ -930,6 +931,7 @@
<module>apache-httpclient4</module>
<module>apache-httpclient</module>
<module>apache-kafka-2</module>
<module>apache-kafka-3</module>
<module>apache-kafka</module>
<module>apache-libraries-2</module>
<module>apache-libraries</module>
Expand Down