diff --git a/src/examples/java/io/synadia/flink/examples/JetStreamExample.java b/src/examples/java/io/synadia/flink/examples/JetStreamExample.java index 12b2940..6d5f2cb 100644 --- a/src/examples/java/io/synadia/flink/examples/JetStreamExample.java +++ b/src/examples/java/io/synadia/flink/examples/JetStreamExample.java @@ -4,6 +4,7 @@ package io.synadia.flink.examples; import io.nats.client.*; +import io.nats.client.api.AckPolicy; import io.nats.client.api.OrderedConsumerConfiguration; import io.synadia.flink.examples.support.ExampleUtils; import io.synadia.flink.examples.support.Publisher; @@ -11,6 +12,7 @@ import io.synadia.flink.message.Utf8StringSourceConverter; import io.synadia.flink.sink.JetStreamSink; import io.synadia.flink.sink.JetStreamSinkBuilder; +import io.synadia.flink.source.AckBehavior; import io.synadia.flink.source.JetStreamSource; import io.synadia.flink.source.JetStreamSourceBuilder; import io.synadia.flink.source.JetStreamSubjectConfiguration; @@ -47,8 +49,8 @@ public class JetStreamExample { // ------------------------------------------------------------------------------------------ // The quiet period is how long to wait when not receiving messages to end the program. - // Set the quiet period longer if you are using ack mode. See notes on ACK_MODE below. - // Try 3000 or 10000 in ack mode. + // Set the quiet period longer if you are using ack behavior. See notes on ACK_BEHAVIOR below. + // Try 3000, 10000 or 20000 depending on ack behavior. // ------------------------------------------------------------------------------------------ public static final int QUIET_PERIOD = 3000; @@ -66,11 +68,15 @@ public class JetStreamExample { // ========================================================================================== // ------------------------------------------------------------------------------------------ - // ACK_MODE mode false means use a consumer with no acking - // ACK_MODE true true means the split(s) will ack (AckPolicy.All) messages at the checkpoint - // Try false or true + // Ack policy for the source. + // This is the policy that the source will use to acknowledge messages + // three options: + // AckBehavior.NoAck - no acks, messages are not acknowledged [default] + // AckBehavior.AckAll - one message ack, all messages are acknowledged, acked by source on checkpoints + // AckBehavior.AllButDoNotAck - Ack policy with all but acking is left to the user + // AckBehavior.ExplicitButDoNotAck - explicit acks, messages must be acknowledged explicitly // ------------------------------------------------------------------------------------------ - public static final boolean ACK_MODE = false; + public static final AckBehavior ACK_BEHAVIOR = AckBehavior.NoAck; // ------------------------------------------------------------------------------------------ // <= 0 makes the source boundedness "Boundedness.CONTINUOUS_UNBOUNDED" @@ -130,7 +136,7 @@ public static void main(String[] args) throws Exception { .streamName(SOURCE_A_STREAM) .subject(SOURCE_A_SUBJECT) .maxMessagesToRead(MAX_MESSAGES_TO_READ) - .ackMode(ACK_MODE) + .ackBehavior(ACK_BEHAVIOR) .build(); System.out.println("JetStreamSubjectConfiguration" + subjectConfigurationA.toJson()); @@ -145,7 +151,7 @@ public static void main(String[] args) throws Exception { .streamName(SOURCE_B_STREAM) .subject(SOURCE_B_SUBJECTS[0]) .maxMessagesToRead(MAX_MESSAGES_TO_READ) - .ackMode(ACK_MODE) + .ackBehavior(ACK_BEHAVIOR) .build()); for (int x = 1; x < SOURCE_B_SUBJECTS.length; x++) { subjectConfigurationsB.add(JetStreamSubjectConfiguration.builder() diff --git a/src/examples/java/io/synadia/flink/examples/JetStreamExampleFromConfigFiles.java b/src/examples/java/io/synadia/flink/examples/JetStreamExampleFromConfigFiles.java index a434632..752cec8 100644 --- a/src/examples/java/io/synadia/flink/examples/JetStreamExampleFromConfigFiles.java +++ b/src/examples/java/io/synadia/flink/examples/JetStreamExampleFromConfigFiles.java @@ -48,8 +48,8 @@ public class JetStreamExampleFromConfigFiles { // ------------------------------------------------------------------------------------------ // The quiet period is how long to wait when not receiving messages to end the program. - // Set the quiet period longer if you have acks 10000 vs. 3000 for instance. - // Try 3000 or 10000 + // Set the quiet period longer if you are using ack behavior. See notes on ACK_BEHAVIOR below. + // Try 3000, 10000 or 20000 depending on ack behavior. // ------------------------------------------------------------------------------------------ public static final int QUIET_PERIOD = 3000; diff --git a/src/examples/java/io/synadia/flink/examples/JetStreamExplicitButDoNotAckExample.java b/src/examples/java/io/synadia/flink/examples/JetStreamExplicitButDoNotAckExample.java new file mode 100644 index 0000000..0cdcb93 --- /dev/null +++ b/src/examples/java/io/synadia/flink/examples/JetStreamExplicitButDoNotAckExample.java @@ -0,0 +1,258 @@ +// Copyright (c) 2023-2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.flink.examples; + +import io.nats.client.*; +import io.nats.client.api.OrderedConsumerConfiguration; +import io.synadia.flink.examples.support.ExampleUtils; +import io.synadia.flink.examples.support.Publisher; +import io.synadia.flink.examples.support.AckMapFunction; +import io.synadia.flink.message.Utf8StringSinkConverter; +import io.synadia.flink.sink.JetStreamSink; +import io.synadia.flink.sink.JetStreamSinkBuilder; +import io.synadia.flink.source.AckBehavior; +import io.synadia.flink.source.JetStreamSource; +import io.synadia.flink.source.JetStreamSourceBuilder; +import io.synadia.flink.source.JetStreamSubjectConfiguration; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.synadia.flink.examples.JetStreamExampleHelper.*; +import static io.synadia.flink.examples.support.ExampleUtils.writeToFile; + +public class JetStreamExplicitButDoNotAckExample { + // ========================================================================================== + // General Configuration: Use these settings to change how the example runs + // ========================================================================================== + + // ------------------------------------------------------------------------------------------ + // This job name is used by flink for management, including the naming + // of threads, which might appear in logging. + // ------------------------------------------------------------------------------------------ + public static final String JOB_NAME = "jse"; + + // ------------------------------------------------------------------------------------------ + // 0 or less don't report + // This is just set so you can see a reasonable amount of progress + // ------------------------------------------------------------------------------------------ + public static final int REPORT_FREQUENCY = 50000; + + // ------------------------------------------------------------------------------------------ + // The quiet period is how long to wait when not receiving messages to end the program. + // Set the quiet period longer if you are using ack behavior. See notes on ACK_BEHAVIOR below. + // Try 3000, 10000 or 20000 depending on ack behavior. + // ------------------------------------------------------------------------------------------ + public static final int QUIET_PERIOD = 40000; + + // ------------------------------------------------------------------------------------------ + // Locations where to write config files based on how the example gets configured. + // These files can be used in the JetStreamExampleFromConfigFiles example. + // ------------------------------------------------------------------------------------------ + public static final String SOURCE_CONFIG_FILE_JSON = "src/examples/resources/js-explicit-source-config.json"; + public static final String SOURCE_CONFIG_FILE_YAML = "src/examples/resources/js-explicit-source-config.yaml"; + + // ========================================================================================== + // JetStreamSource Configuration: Use these settings to change how the source is configured + // ========================================================================================== + + // ------------------------------------------------------------------------------------------ + // Ack policy for the source. + // This is the policy that the source will use to acknowledge messages + // three options: + // AckBehavior.NoAck - no acks, messages are not acknowledged [default] + // AckBehavior.AckAll - one message ack, all messages are acknowledged, acked by source on checkpoints + // AckBehavior.AllButDoNotAck - Ack policy with all but acking is left to the user + // AckBehavior.ExplicitButDoNotAck - explicit acks, messages must be acknowledged explicitly + // ------------------------------------------------------------------------------------------ + public static final AckBehavior ACK_BEHAVIOR = AckBehavior.ExplicitButDoNotAck; + + // ------------------------------------------------------------------------------------------ + // <= 0 makes the source boundedness "Boundedness.CONTINUOUS_UNBOUNDED" + // > 0 makes the source boundedness "Boundedness.BOUNDED" by giving it a maximum number of messages to read + // Try -1 or 50000 or if using ack mode, try 10000 + // ------------------------------------------------------------------------------------------ + public static final int MAX_MESSAGES_TO_READ = -1; + + // ========================================================================================== + // Flink Configuration: Use these settings to change how Flink runs + // ========================================================================================== + + // ------------------------------------------------------------------------------------------ + // if > 0 parallelism will manually set to this value + // Try 3 or 1 + // ------------------------------------------------------------------------------------------ + public static final int PARALLELISM = 3; + + // ------------------------------------------------------------------------------------------ + // if > 0, how often in milliseconds to checkpoint, otherwise checkpoint will not be done + // Try 5000 or 0 + // ------------------------------------------------------------------------------------------ + public static final int CHECKPOINTING_INTERVAL = 5000; + + public static void main(String[] args) throws Exception { + // ========================================================================================== + // Setup + // ========================================================================================== + // Make a connection to use for setting up streams + // 1. We need data that the source will consume + // 2. We need a stream/subject for the sink to publish to + Connection nc = ExampleUtils.connect(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE); + setupSinkStream(nc); + setupDataStreams(nc); + + // ========================================================================================== + // Create a JetStream source + // ========================================================================================== + // JetStreamSubjectConfiguration are the key to building a source. + // Each subject must have its own configuration. + // The source builder can add multiple subject configurations, including + // both instances and lists of JetStreamSubjectConfiguration. + // ------------------------------------------------------------------------------------------ + // The main restriction is that all configurations for a source + // must be the same type of Boundedness. Boundedness is determined + // from the configuration of maxMessagesToRead + // ------------------------------------------------------------------------------------------ + + // ------------------------------------------------------------------------------------------ + // A single JetStreamSubjectConfiguration, one subject for the stream. + // ------------------------------------------------------------------------------------------ + // Configure the stream, its subjects, and other source behavior + // The buildWithSubject method returns an instance of JetStreamSubjectConfiguration. + // Use this when you have only one subject for a given stream/configuration + // ------------------------------------------------------------------------------------------ + JetStreamSubjectConfiguration subjectConfigurationA = JetStreamSubjectConfiguration.builder() + .streamName(SOURCE_A_STREAM) + .subject(SOURCE_A_SUBJECT) + .maxMessagesToRead(MAX_MESSAGES_TO_READ) + .ackBehavior(ACK_BEHAVIOR) + .build(); + System.out.println("JetStreamSubjectConfiguration" + subjectConfigurationA.toJson()); + + // ------------------------------------------------------------------------------------------ + // A list of JetStreamSubjectConfiguration, multiple subjects for one stream. + // ------------------------------------------------------------------------------------------ + // The buildWithSubjects method returns a list of JetStreamSubjectConfiguration. + // Use this when you have multiple subjects for a given stream/configuration + // ------------------------------------------------------------------------------------------ + List subjectConfigurationsB = new ArrayList<>(); + subjectConfigurationsB.add(JetStreamSubjectConfiguration.builder() + .streamName(SOURCE_B_STREAM) + .subject(SOURCE_B_SUBJECTS[0]) + .maxMessagesToRead(MAX_MESSAGES_TO_READ) + .ackBehavior(ACK_BEHAVIOR) + .build()); + for (int x = 1; x < SOURCE_B_SUBJECTS.length; x++) { + subjectConfigurationsB.add(JetStreamSubjectConfiguration.builder() + .copy(subjectConfigurationsB.get(0)) + .subject(SOURCE_B_SUBJECTS[x]) + .build()); + } + for (JetStreamSubjectConfiguration jssc : subjectConfigurationsB) { + System.out.println("JetStreamSubjectConfiguration" + jssc.toJson()); + } + + // ------------------------------------------------------------------------------------------ + // The JetStreamSource + // ------------------------------------------------------------------------------------------ + // Build the source by setting up the connection properties, the message supplier + // and subject configurations, etc. + JetStreamSource source = new JetStreamSourceBuilder() + .connectionPropertiesFile(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE) + .sourceConverter(new JetStreamExplicitSourceConnector()) + .addSubjectConfigurations(subjectConfigurationA) + .addSubjectConfigurations(subjectConfigurationsB) + .build(); + + // ------------------------------------------------------------------------------------------ + // Here we write the source config out to a file in different formats. + // The files can be used in the JetStreamExampleFromConfigFiles example. + // ------------------------------------------------------------------------------------------ + writeToFile(SOURCE_CONFIG_FILE_JSON, source.toJson()); + writeToFile(SOURCE_CONFIG_FILE_YAML, source.toYaml()); + + // ========================================================================================== + // Create a JetStream sink + // ========================================================================================== + // A JetStream sink publishes to a JetStream subject + // ------------------------------------------------------------------------------------------ + // When we published to the source streams, the data was in the form "data----" + // The sink takes that payload and publishes it as the message payload + // to all the sink subjects. For this example, there is only one sink subject, see SINK_SUBJECT + // ------------------------------------------------------------------------------------------ + JetStreamSink sink = new JetStreamSinkBuilder() + .connectionPropertiesFile(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE) + .sinkConverter(new Utf8StringSinkConverter()) + .subjects(SINK_SUBJECT) + .build(); + + // ========================================================================================== + // Setup and start flink + // ========================================================================================== + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + if (CHECKPOINTING_INTERVAL > 0) { + env.enableCheckpointing(CHECKPOINTING_INTERVAL); + } + if (PARALLELISM > 0) { + env.setParallelism(PARALLELISM); + } + + DataStream dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), JOB_NAME); + dataStream.map(new AckMapFunction()).name("Ack Messages").uid("ack-source-messages").sinkTo(sink); + + env.executeAsync(JOB_NAME); + + // ========================================================================================== + // Consume messages that the sink produces + // ========================================================================================== + // Since we are using a JetStreamSink, messages are getting published to a stream subject. + // Here we will consume messages that the JetStreamSink published, to demonstrate + // that we got a message all the way from a source to this sink stream subject + // ------------------------------------------------------------------------------------------ + StreamContext sc = nc.getStreamContext(SINK_STREAM_NAME); + OrderedConsumerContext occ = sc.createOrderedConsumer( + new OrderedConsumerConfiguration().filterSubjects(SINK_SUBJECT)); + try (IterableConsumer consumer = occ.iterate()) { + long lastMessageReceived = System.currentTimeMillis() + 5000; // 5000 gives it a little time to get started + int manualTotal = 0; + Map receivedMap = new HashMap<>(); + long sinceLastMessage; + do { + Message m = consumer.nextMessage(1000); + if (m == null) { + sinceLastMessage = System.currentTimeMillis() - lastMessageReceived; + } + else { + // the extractSubject method pulls the subject out of the data string so we + // can count the number of messages published per source subject. + String data = new String(m.getData()); + String publishedSubject = Publisher.extractSubject(data); + AtomicInteger count = receivedMap.computeIfAbsent(publishedSubject, k -> new AtomicInteger()); + count.incrementAndGet(); + lastMessageReceived = System.currentTimeMillis(); + sinceLastMessage = 0; + manualTotal++; + if (REPORT_FREQUENCY > 0) { + if (manualTotal % REPORT_FREQUENCY == 0) { + ExampleUtils.reportSinkListener(receivedMap, manualTotal); + } + } + } + } while (manualTotal < SOURCES_TOTAL_MESSAGES && sinceLastMessage < QUIET_PERIOD); + + ExampleUtils.reportSinkListener(receivedMap, manualTotal); + } + + System.exit(0); // Threads are running, stuff still going, so force exit. Probably not a production strategy! + } +} + diff --git a/src/examples/java/io/synadia/flink/examples/JetStreamExplicitSourceConnector.java b/src/examples/java/io/synadia/flink/examples/JetStreamExplicitSourceConnector.java new file mode 100644 index 0000000..f0d5de4 --- /dev/null +++ b/src/examples/java/io/synadia/flink/examples/JetStreamExplicitSourceConnector.java @@ -0,0 +1,29 @@ +// Copyright (c) 2023-2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.flink.examples; + +import io.nats.client.Message; +import io.nats.client.support.JsonUtils; +import io.synadia.flink.message.SourceConverter; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import java.nio.charset.StandardCharsets; + +public class JetStreamExplicitSourceConnector implements SourceConverter { + private static final long serialVersionUID = 1L; + + @Override + public String convert(Message message) { + StringBuilder stringBuilder = JsonUtils.beginJson(); + String data = new String(message.getData(), StandardCharsets.UTF_8); + JsonUtils.addField(stringBuilder, "data", data); + JsonUtils.addField(stringBuilder, "reply_to", message.getReplyTo()); + return JsonUtils.endJson(stringBuilder).toString(); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } +} + diff --git a/src/examples/java/io/synadia/flink/examples/support/AckMapFunction.java b/src/examples/java/io/synadia/flink/examples/support/AckMapFunction.java new file mode 100644 index 0000000..31c838d --- /dev/null +++ b/src/examples/java/io/synadia/flink/examples/support/AckMapFunction.java @@ -0,0 +1,64 @@ +// Copyright (c) 2023-2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.flink.examples.support; + +import io.nats.client.Connection; +import io.nats.client.impl.AckType; +import io.nats.client.support.JsonParser; +import io.nats.client.support.JsonValue; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FlinkRuntimeException; + +// ===================================================================================================================================== +// This class is used to simulate acknowledge messages in a Flink job that processes & acks NATS messages. +// It connects to a NATS server and sends an acknowledgment message back to the replyTo subject specified in the input JSON. +// The input JSON is expected to have a "data" field containing the message data and a "reply_to" field specifying where to send the ack. +// ===================================================================================================================================== +public class AckMapFunction extends RichMapFunction { + private static final byte[] ACK_BODY_BYTES = AckType.AckAck.bodyBytes(-1); + private transient Connection natsCtx; + + @Override + public void open(Configuration parameters) throws Exception { + try { + this.natsCtx = ExampleUtils.connect(ExampleUtils.EXAMPLES_CONNECTION_PROPERTIES_FILE); + } + catch (Exception e){ + System.out.println("Error connecting to NATS server: " + e); + throw new FlinkRuntimeException("Error connecting to NATS server", e); + } + } + + @Override + public String map(String s) throws Exception { + try { + JsonValue parsed = JsonParser.parse(s); + JsonValue data = parsed.map.get("data"); + if (data == null) { + throw new IllegalArgumentException("data field is missing in the input: " + s); + } + + JsonValue replyTo = parsed.map.get("reply_to"); + if (replyTo == null) { + throw new IllegalArgumentException("reply_to is missing in the input: " + s); + } + + natsCtx.publish(replyTo.string, ACK_BODY_BYTES); + return data.string; + } + catch (Exception e) { + System.out.println("Error processing message: " + e); + throw new FlinkRuntimeException("Error processing message: " + s, e); + } + } + + @Override + public void close() throws Exception { + if (natsCtx != null) { + natsCtx.close(); + } + } +} + diff --git a/src/examples/resources/js-explicit-source-config.json b/src/examples/resources/js-explicit-source-config.json new file mode 100644 index 0000000..7388c57 --- /dev/null +++ b/src/examples/resources/js-explicit-source-config.json @@ -0,0 +1,31 @@ +{ + "source_converter_class_name": "io.synadia.flink.examples.JetStreamExplicitSourceConnector", + "jetstream_subject_configurations": [ + { + "stream_name": "source-b", + "subject": "ub1", + "ack_behavior": "ExplicitButDoNotAck" + }, + { + "stream_name": "source-b", + "subject": "ub3", + "ack_behavior": "ExplicitButDoNotAck" + }, + { + "stream_name": "source-b", + "subject": "ub4", + "ack_behavior": "ExplicitButDoNotAck" + }, + { + "stream_name": "source-a", + "subject": "ua", + "ack_behavior": "ExplicitButDoNotAck" + }, + { + "stream_name": "source-b", + "subject": "ub2", + "ack_behavior": "ExplicitButDoNotAck" + } + ] +} + diff --git a/src/examples/resources/js-explicit-source-config.yaml b/src/examples/resources/js-explicit-source-config.yaml new file mode 100644 index 0000000..77ab678 --- /dev/null +++ b/src/examples/resources/js-explicit-source-config.yaml @@ -0,0 +1,19 @@ +--- +source_converter_class_name: io.synadia.flink.examples.JetStreamExplicitSourceConnector +jetstream_subject_configurations: + - stream_name: source-b + subject: ub1 + ack_behavior: ExplicitButDoNotAck + - stream_name: source-b + subject: ub3 + ack_behavior: ExplicitButDoNotAck + - stream_name: source-b + subject: ub4 + ack_behavior: ExplicitButDoNotAck + - stream_name: source-a + subject: ua + ack_behavior: ExplicitButDoNotAck + - stream_name: source-b + subject: ub2 + ack_behavior: ExplicitButDoNotAck + diff --git a/src/examples/resources/js-source-config.yaml b/src/examples/resources/js-source-config.yaml index c421e28..4ede6b2 100644 --- a/src/examples/resources/js-source-config.yaml +++ b/src/examples/resources/js-source-config.yaml @@ -1,6 +1,6 @@ --- source_converter_class_name: io.synadia.flink.message.Utf8StringSourceConverter -jetstream_subject_configurations: +jetstream_subject_configurations: - stream_name: source-a subject: ua - stream_name: source-b diff --git a/src/main/java/io/synadia/flink/source/AckBehavior.java b/src/main/java/io/synadia/flink/source/AckBehavior.java new file mode 100644 index 0000000..610c5f7 --- /dev/null +++ b/src/main/java/io/synadia/flink/source/AckBehavior.java @@ -0,0 +1,36 @@ +// Copyright (c) 2023-2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.flink.source; + +import java.util.HashMap; +import java.util.Map; + +public enum AckBehavior { + NoAck("NoAck"), + AckAll("AckAll"), + AllButDoNotAck("AllButDoNotAck"), + ExplicitButDoNotAck("ExplicitButDoNotAck"); + + private final String behavior; + private static final Map strEnumHash = new HashMap(); + + AckBehavior(String p) { + this.behavior = p; + } + + public String toString() { + return this.behavior; + } + + public static AckBehavior get(String value) { + return strEnumHash.get(value); + } + + static { + for(AckBehavior env : values()) { + strEnumHash.put(env.toString(), env); + } + } +} + diff --git a/src/main/java/io/synadia/flink/source/JetStreamSubjectConfiguration.java b/src/main/java/io/synadia/flink/source/JetStreamSubjectConfiguration.java index 0e0cb63..6474080 100644 --- a/src/main/java/io/synadia/flink/source/JetStreamSubjectConfiguration.java +++ b/src/main/java/io/synadia/flink/source/JetStreamSubjectConfiguration.java @@ -5,6 +5,7 @@ import io.nats.client.BaseConsumeOptions; import io.nats.client.ConsumeOptions; +import io.nats.client.api.AckPolicy; import io.nats.client.api.DeliverPolicy; import io.nats.client.support.*; import io.synadia.flink.utils.MiscUtils; @@ -14,6 +15,7 @@ import java.io.Serializable; import java.time.ZonedDateTime; import java.util.Map; +import java.util.Objects; import static io.nats.client.BaseConsumeOptions.DEFAULT_MESSAGE_COUNT; import static io.nats.client.BaseConsumeOptions.DEFAULT_THRESHOLD_PERCENT; @@ -34,7 +36,7 @@ public class JetStreamSubjectConfiguration implements JsonSerializable, Serializ public final long startSequence; public final ZonedDateTime startTime; public final long maxMessagesToRead; - public final boolean ackMode; + public final AckBehavior ackBehavior; public final SerializableConsumeOptions serializableConsumeOptions; public final Boundedness boundedness; @@ -47,7 +49,7 @@ private JetStreamSubjectConfiguration(Builder b, ConsumeOptions consumeOptions) startSequence = b.startSequence; startTime = b.startTime; maxMessagesToRead = b.maxMessagesToRead; - ackMode = b.ackMode; + ackBehavior = Objects.requireNonNullElse(b.ackBehavior, AckBehavior.NoAck); boundedness = maxMessagesToRead > 0 ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED; deliverPolicy = startSequence != -1 @@ -61,7 +63,7 @@ private JetStreamSubjectConfiguration(Builder b, ConsumeOptions consumeOptions) startSequence, startTime, maxMessagesToRead, - ackMode, + ackBehavior, serializableConsumeOptions.getConsumeOptions().toJson() ); } @@ -74,7 +76,11 @@ public String toJson() { JsonUtils.addField(sb, START_SEQUENCE, startSequence); JsonUtils.addField(sb, START_TIME, startTime); JsonUtils.addField(sb, MAX_MESSAGES_TO_READ, maxMessagesToRead); - JsonUtils.addFldWhenTrue(sb, ACK_MODE, ackMode); + + if (ackBehavior != AckBehavior.NoAck) { + JsonUtils.addField(sb, ACK_BEHAVIOR, ackBehavior.toString()); + } + ConsumeOptions co = serializableConsumeOptions.getConsumeOptions(); if (co.getBatchSize() != DEFAULT_MESSAGE_COUNT) { JsonUtils.addField(sb, BATCH_SIZE, co.getBatchSize()); @@ -92,7 +98,11 @@ public String toYaml(int indentLevel) { YamlUtils.addField(sb, indentLevel, START_SEQUENCE, startSequence); YamlUtils.addField(sb, indentLevel, START_TIME, startTime); YamlUtils.addField(sb, indentLevel, MAX_MESSAGES_TO_READ, maxMessagesToRead); - YamlUtils.addFldWhenTrue(sb, indentLevel, ACK_MODE, ackMode); + + if (ackBehavior != AckBehavior.NoAck) { + YamlUtils.addField(sb, indentLevel, ACK_BEHAVIOR, ackBehavior.toString()); + } + ConsumeOptions co = serializableConsumeOptions.getConsumeOptions(); if (co.getBatchSize() != DEFAULT_MESSAGE_COUNT) { YamlUtils.addFieldGtZero(sb, indentLevel, BATCH_SIZE, co.getBatchSize()); @@ -129,7 +139,7 @@ public static JetStreamSubjectConfiguration fromJsonValue(JsonValue jv) { .maxMessagesToRead(JsonValueUtils.readLong(jv, MAX_MESSAGES_TO_READ, -1)) .batchSize(JsonValueUtils.readInteger(jv, BATCH_SIZE, -1)) .thresholdPercent(JsonValueUtils.readInteger(jv, THRESHOLD_PERCENT, -1)) - .ackMode(JsonValueUtils.readBoolean(jv, ACK_MODE, false)) + .ackBehavior(AckBehavior.valueOf(JsonValueUtils.readString(jv, ACK_BEHAVIOR, AckBehavior.NoAck.toString()))) .build(); } @@ -142,7 +152,7 @@ public static JetStreamSubjectConfiguration fromMap(Map map) { .maxMessagesToRead(YamlUtils.readLong(map, MAX_MESSAGES_TO_READ, -1)) .batchSize(YamlUtils.readInteger(map, BATCH_SIZE, -1)) .thresholdPercent(YamlUtils.readInteger(map, THRESHOLD_PERCENT, -1)) - .ackMode(YamlUtils.readBoolean(map, ACK_MODE, false)) + .ackBehavior(AckBehavior.get(YamlUtils.readString(map, ACK_BEHAVIOR, AckPolicy.None.toString()))) .build(); } @@ -152,7 +162,7 @@ public static class Builder { private long startSequence = -1; private ZonedDateTime startTime; private long maxMessagesToRead = -1; - private boolean ackMode = false; + private AckBehavior ackBehavior = AckBehavior.NoAck; private int batchSize = -1; private int thresholdPercent = -1; @@ -166,7 +176,7 @@ public Builder copy(JetStreamSubjectConfiguration config) { .startSequence(config.startSequence) .startTime(config.startTime) .maxMessagesToRead(config.maxMessagesToRead) - .ackMode(config.ackMode) + .ackBehavior(config.ackBehavior) .batchSize(config.serializableConsumeOptions.getConsumeOptions().getBatchSize()) .thresholdPercent(config.serializableConsumeOptions.getConsumeOptions().getThresholdPercent()); } @@ -257,26 +267,35 @@ public Builder maxMessagesToRead(long maxMessagesToRead) { } /** - * Set that the stream is a work queue and messages must be acked. + * Sets the ack policy None for the consumer. + * @return the builder + */ + public Builder ackBehavior() { + return ackBehavior(AckBehavior.NoAck); + } + + /** + * Sets the ack policy for the consumer. * Ack will occur when a checkpoint is complete via ack all. - * It's not recommended to set ackMode unless your stream is a work queue, + * AckBehavior.NoAck is the default. AckBehavior.All is slower than NoAck. AckBehavior.ExplicitButNoAck is the slowest. + * It is not recommended to set ackMode unless your stream is a work queue, * but even then, be sure of why you are running this against a work queue. + * @param ackBehavior the ack behavior or null for AckPolicy.None * @return the builder */ - public Builder ackMode() { - this.ackMode = true; + public Builder ackBehavior(AckBehavior ackBehavior) { + this.ackBehavior = ackBehavior == null ? AckBehavior.NoAck : ackBehavior; return this; } /** - * Set whether to be in ack mode queue where must be acked. - * Ack will occur when a checkpoint is complete via ack all. - * This is MUCH slower and is not recommended to set ackMode - * unless your stream is a work queue, but even then, - * be sure of why you are running this against a work queue. + * @deprecated Use {@link #ackBehavior(AckBehavior)} instead. + * ackMode false sets the policy to AckBehavior.NoAck. + * ackMode true sets the policy to AckBehavior.All */ + @Deprecated public Builder ackMode(boolean ackMode) { - this.ackMode = ackMode; + this.ackBehavior = ackMode ? AckBehavior.AckAll : AckBehavior.NoAck; return this; } diff --git a/src/main/java/io/synadia/flink/source/reader/JetStreamSourceReader.java b/src/main/java/io/synadia/flink/source/reader/JetStreamSourceReader.java index e90c779..205f60c 100644 --- a/src/main/java/io/synadia/flink/source/reader/JetStreamSourceReader.java +++ b/src/main/java/io/synadia/flink/source/reader/JetStreamSourceReader.java @@ -11,6 +11,7 @@ import io.nats.client.impl.AckType; import io.nats.client.support.SerializableConsumeOptions; import io.synadia.flink.message.SourceConverter; +import io.synadia.flink.source.AckBehavior; import io.synadia.flink.source.split.JetStreamSplit; import io.synadia.flink.source.split.JetStreamSplitMessage; import io.synadia.flink.utils.ConnectionContext; @@ -35,7 +36,7 @@ * INTERNAL CLASS SUBJECT TO CHANGE */ @Internal -public class JetStreamSourceReader implements SourceReader { +public class JetStreamSourceReader implements SourceReader { private static final byte[] ACK_BODY_BYTES = AckType.AckAck.bodyBytes(-1); private final boolean bounded; @@ -141,9 +142,8 @@ public void addSplits(List splits) { if (!splitMap.containsKey(split.splitId()) && !split.finished.get()) { try { StreamContext sc = connectionFactory.getConnectionContext().js.getStreamContext(split.subjectConfig.streamName); - BaseConsumerContext consumerContext = split.subjectConfig.ackMode - ? createConsumer(split, sc) - : createOrderedConsumer(split, sc); + BaseConsumerContext consumerContext = split.subjectConfig.ackBehavior== AckBehavior.NoAck + ? createOrderedConsumer(split, sc) : createConsumer(split, sc); SerializableConsumeOptions sco = split.subjectConfig.serializableConsumeOptions; ConsumeOptions consumeOptions = sco == null ? DEFAULT_CONSUME_OPTIONS : sco.getConsumeOptions(); @@ -164,8 +164,18 @@ public void addSplits(List splits) { private BaseConsumerContext createConsumer(JetStreamSplit split, StreamContext sc) throws JetStreamApiException, IOException { ConsumerConfiguration.Builder b = ConsumerConfiguration.builder() - .ackPolicy(AckPolicy.All) .filterSubject(split.subjectConfig.subject); + + if (split.subjectConfig.ackBehavior == AckBehavior.AllButDoNotAck || split.subjectConfig.ackBehavior == AckBehavior.AckAll) { + b.ackPolicy(AckPolicy.All); + } + else if (split.subjectConfig.ackBehavior == AckBehavior.ExplicitButDoNotAck) { + b.ackPolicy(AckPolicy.Explicit); + } + else { + throw new IllegalArgumentException("Unsupported ack behavior: " + split.subjectConfig.ackBehavior); + } + long lastSeq = split.lastEmittedStreamSequence.get(); if (lastSeq > 0) { b.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(lastSeq + 1); @@ -210,7 +220,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { ConnectionContext connectionContext = getConnectionContext(); for (JetStreamSourceReaderSplit srSplit : splitMap.values()) { JetStreamSourceReaderSplit.Snapshot snapshot = srSplit.removeSnapshot(checkpointId); - if (snapshot != null && srSplit.split.subjectConfig.ackMode) { + if (snapshot != null && srSplit.split.subjectConfig.ackBehavior == AckBehavior.AckAll) { // Manual ack since we don't have the message. // Use the original message's "reply_to" since this is where the ack info is kept. // Also, we execute as a task so as not to slow down the reader diff --git a/src/main/java/io/synadia/flink/utils/Constants.java b/src/main/java/io/synadia/flink/utils/Constants.java index d2233c7..6bccd63 100644 --- a/src/main/java/io/synadia/flink/utils/Constants.java +++ b/src/main/java/io/synadia/flink/utils/Constants.java @@ -27,7 +27,7 @@ public interface Constants { String START_SEQUENCE = "start_sequence"; String START_TIME = "start_time"; String MAX_MESSAGES_TO_READ = "max_messages_to_read"; - String ACK_MODE = "ack_mode"; + String ACK_BEHAVIOR = "ack_behavior"; String BATCH_SIZE = "batch_size"; String THRESHOLD_PERCENT = "threshold_percent"; diff --git a/src/test/java/io/synadia/flink/source/JetStreamSourceBuilderTest.java b/src/test/java/io/synadia/flink/source/JetStreamSourceBuilderTest.java index f028afc..fc33700 100644 --- a/src/test/java/io/synadia/flink/source/JetStreamSourceBuilderTest.java +++ b/src/test/java/io/synadia/flink/source/JetStreamSourceBuilderTest.java @@ -94,7 +94,7 @@ else if (which.equals("Endless")) { JetStreamSubjectConfiguration.builder() .streamName("StreamAM") .subject("SubjectAM") - .ackMode() + .ackBehavior() .build(), JetStreamSubjectConfiguration.builder() .streamName("StreamMany")