这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/examples/java/io/synadia/flink/examples/JetStreamExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
package io.synadia.flink.examples;

import io.nats.client.*;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.OrderedConsumerConfiguration;
import io.synadia.flink.examples.support.ExampleUtils;
import io.synadia.flink.examples.support.Publisher;
import io.synadia.flink.message.Utf8StringSinkConverter;
import io.synadia.flink.message.Utf8StringSourceConverter;
import io.synadia.flink.sink.JetStreamSink;
import io.synadia.flink.sink.JetStreamSinkBuilder;
import io.synadia.flink.source.AckBehavior;
import io.synadia.flink.source.JetStreamSource;
import io.synadia.flink.source.JetStreamSourceBuilder;
import io.synadia.flink.source.JetStreamSubjectConfiguration;
Expand Down Expand Up @@ -47,8 +49,8 @@ public class JetStreamExample {

// ------------------------------------------------------------------------------------------
// The quiet period is how long to wait when not receiving messages to end the program.
// Set the quiet period longer if you are using ack mode. See notes on ACK_MODE below.
// Try 3000 or 10000 in ack mode.
// Set the quiet period longer if you are using ack behavior. See notes on ACK_BEHAVIOR below.
// Try 3000, 10000 or 20000 depending on ack behavior.
// ------------------------------------------------------------------------------------------
public static final int QUIET_PERIOD = 3000;

Expand All @@ -66,11 +68,15 @@ public class JetStreamExample {
// ==========================================================================================

// ------------------------------------------------------------------------------------------
// ACK_MODE mode false means use a consumer with no acking
// ACK_MODE true true means the split(s) will ack (AckPolicy.All) messages at the checkpoint
// Try false or true
// Ack policy for the source.
// This is the policy that the source will use to acknowledge messages
// three options:
// AckBehavior.NoAck - no acks, messages are not acknowledged [default]
// AckBehavior.AckAll - one message ack, all messages are acknowledged, acked by source on checkpoints
// AckBehavior.AllButDoNotAck - Ack policy with all but acking is left to the user
// AckBehavior.ExplicitButDoNotAck - explicit acks, messages must be acknowledged explicitly
// ------------------------------------------------------------------------------------------
public static final boolean ACK_MODE = false;
public static final AckBehavior ACK_BEHAVIOR = AckBehavior.NoAck;

// ------------------------------------------------------------------------------------------
// <= 0 makes the source boundedness "Boundedness.CONTINUOUS_UNBOUNDED"
Expand Down Expand Up @@ -130,7 +136,7 @@ public static void main(String[] args) throws Exception {
.streamName(SOURCE_A_STREAM)
.subject(SOURCE_A_SUBJECT)
.maxMessagesToRead(MAX_MESSAGES_TO_READ)
.ackMode(ACK_MODE)
.ackBehavior(ACK_BEHAVIOR)
.build();
System.out.println("JetStreamSubjectConfiguration" + subjectConfigurationA.toJson());

Expand All @@ -145,7 +151,7 @@ public static void main(String[] args) throws Exception {
.streamName(SOURCE_B_STREAM)
.subject(SOURCE_B_SUBJECTS[0])
.maxMessagesToRead(MAX_MESSAGES_TO_READ)
.ackMode(ACK_MODE)
.ackBehavior(ACK_BEHAVIOR)
.build());
for (int x = 1; x < SOURCE_B_SUBJECTS.length; x++) {
subjectConfigurationsB.add(JetStreamSubjectConfiguration.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public class JetStreamExampleFromConfigFiles {

// ------------------------------------------------------------------------------------------
// The quiet period is how long to wait when not receiving messages to end the program.
// Set the quiet period longer if you have acks 10000 vs. 3000 for instance.
// Try 3000 or 10000
// Set the quiet period longer if you are using ack behavior. See notes on ACK_BEHAVIOR below.
// Try 3000, 10000 or 20000 depending on ack behavior.
// ------------------------------------------------------------------------------------------
public static final int QUIET_PERIOD = 3000;

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2023-2025 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.examples;

import io.nats.client.Message;
import io.nats.client.support.JsonUtils;
import io.synadia.flink.message.SourceConverter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.nio.charset.StandardCharsets;

public class JetStreamExplicitSourceConnector implements SourceConverter<String> {
private static final long serialVersionUID = 1L;

@Override
public String convert(Message message) {
StringBuilder stringBuilder = JsonUtils.beginJson();
String data = new String(message.getData(), StandardCharsets.UTF_8);
JsonUtils.addField(stringBuilder, "data", data);
JsonUtils.addField(stringBuilder, "reply_to", message.getReplyTo());
return JsonUtils.endJson(stringBuilder).toString();
}

@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2023-2025 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.examples.support;

import io.nats.client.Connection;
import io.nats.client.impl.AckType;
import io.nats.client.support.JsonParser;
import io.nats.client.support.JsonValue;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FlinkRuntimeException;

// =====================================================================================================================================
// This class is used to simulate acknowledge messages in a Flink job that processes & acks NATS messages.
// It connects to a NATS server and sends an acknowledgment message back to the replyTo subject specified in the input JSON.
// The input JSON is expected to have a "data" field containing the message data and a "reply_to" field specifying where to send the ack.
// =====================================================================================================================================
public class AckMapFunction extends RichMapFunction<String, String> {
private static final byte[] ACK_BODY_BYTES = AckType.AckAck.bodyBytes(-1);
private transient Connection natsCtx;

@Override
public void open(Configuration parameters) throws Exception {
try {
this.natsCtx = ExampleUtils.connect(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE);
}
catch (Exception e){
System.out.println("Error connecting to NATS server: " + e);
throw new FlinkRuntimeException("Error connecting to NATS server", e);
}
}

@Override
public String map(String s) throws Exception {
try {
JsonValue parsed = JsonParser.parse(s);
JsonValue data = parsed.map.get("data");
if (data == null) {
throw new IllegalArgumentException("data field is missing in the input: " + s);
}

JsonValue replyTo = parsed.map.get("reply_to");
if (replyTo == null) {
throw new IllegalArgumentException("reply_to is missing in the input: " + s);
}

natsCtx.publish(replyTo.string, ACK_BODY_BYTES);
return data.string;
}
catch (Exception e) {
System.out.println("Error processing message: " + e);
throw new FlinkRuntimeException("Error processing message: " + s, e);
}
}

@Override
public void close() throws Exception {
if (natsCtx != null) {
natsCtx.close();
}
}
}

31 changes: 31 additions & 0 deletions src/examples/resources/js-explicit-source-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"source_converter_class_name": "io.synadia.flink.examples.JetStreamExplicitSourceConnector",
"jetstream_subject_configurations": [
{
"stream_name": "source-b",
"subject": "ub1",
"ack_behavior": "ExplicitButDoNotAck"
},
{
"stream_name": "source-b",
"subject": "ub3",
"ack_behavior": "ExplicitButDoNotAck"
},
{
"stream_name": "source-b",
"subject": "ub4",
"ack_behavior": "ExplicitButDoNotAck"
},
{
"stream_name": "source-a",
"subject": "ua",
"ack_behavior": "ExplicitButDoNotAck"
},
{
"stream_name": "source-b",
"subject": "ub2",
"ack_behavior": "ExplicitButDoNotAck"
}
]
}

19 changes: 19 additions & 0 deletions src/examples/resources/js-explicit-source-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
source_converter_class_name: io.synadia.flink.examples.JetStreamExplicitSourceConnector
jetstream_subject_configurations:
- stream_name: source-b
subject: ub1
ack_behavior: ExplicitButDoNotAck
- stream_name: source-b
subject: ub3
ack_behavior: ExplicitButDoNotAck
- stream_name: source-b
subject: ub4
ack_behavior: ExplicitButDoNotAck
- stream_name: source-a
subject: ua
ack_behavior: ExplicitButDoNotAck
- stream_name: source-b
subject: ub2
ack_behavior: ExplicitButDoNotAck

2 changes: 1 addition & 1 deletion src/examples/resources/js-source-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source_converter_class_name: io.synadia.flink.message.Utf8StringSourceConverter
jetstream_subject_configurations:
jetstream_subject_configurations:
- stream_name: source-a
subject: ua
- stream_name: source-b
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/synadia/flink/source/AckBehavior.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2023-2025 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.source;

import java.util.HashMap;
import java.util.Map;

public enum AckBehavior {
NoAck("NoAck"),
AckAll("AckAll"),
AllButDoNotAck("AllButDoNotAck"),
ExplicitButDoNotAck("ExplicitButDoNotAck");

private final String behavior;
private static final Map<String, AckBehavior> strEnumHash = new HashMap();

AckBehavior(String p) {
this.behavior = p;
}

public String toString() {
return this.behavior;
}

public static AckBehavior get(String value) {
return strEnumHash.get(value);
}

static {
for(AckBehavior env : values()) {
strEnumHash.put(env.toString(), env);
}
}
}

Loading