这是indexloc提供的服务,不要输入任何密码
Skip to content

Conversation

@tilakraj94
Copy link
Collaborator

@tilakraj94 tilakraj94 commented Jun 19, 2025

This PR introduces a more expressive and flexible acknowledgment configuration for the Flink-NATS integration by replacing the legacy boolean ackMode with a strongly-typed AckBehavior. This change enables finer-grained control over message acknowledgment semantics, while preserving existing behaviors and adding support for new use cases.

Changes Made

Replaced the ackMode boolean with a new enum-style AckBehavior supporting four distinct modes:

  1. NoAck – Equivalent to ackMode: false; uses ephemeral ordered consumers with no acknowledgment.
  2. AckAll – Equivalent to ackMode: true; uses regular consumers with AckPolicy.All, and messages are acknowledged only during checkpointing.
  3. AllButDoNotAck – Uses regular consumers with AckPolicy.All, but disables automatic checkpoint-based acknowledgment. Useful when the application wants checkpoint integration but will handle acks separately or not at all.
  4. ExplicitButDoNotAck – Uses regular consumers with AckPolicy.Explicit, requiring the application to explicitly acknowledge each message. Checkpointing is decoupled from acknowledgment.

Backward Compatibility

  1. AckAll behaves exactly like the previous ackMode: true.
  2. NoAck behaves like the previous ackMode: false.

Problem Solved

Supports advanced scenarios where applications want full control over acknowledgment logic.

Fixes: #66

Copy link
Collaborator

@scottf scottf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I did this review and was onboard at first. But I don't see anywhere that you handle Explicit. It cannot be handled like all.
Explicit is going to be brutally slow. I think this is a terrible idea. I still have not seen any note from you on actual use cases where this is important, but I guess it's not a problem to allow it.

@tilakraj9560
Copy link

tilakraj9560 commented Jun 24, 2025

@scottf Thank you for reviewing the PR. I have resolved all the comments. I agree that switching the ack policy from none to explicit will introduce some performance overhead. However, there are valid use cases for this; such as long-running workflows where message ordering isn’t critical, but ensuring each message is successfully processed (or retried on failure) is essential.

@scottf
Copy link
Collaborator

scottf commented Jun 26, 2025

@scottf Thank you for reviewing the PR. I have resolved all the comments. I agree that switching the ack policy from none to explicit will introduce some performance overhead. However, there are valid use cases for this; such as long-running workflows where message ordering isn’t critical, but ensuring each message is successfully processed (or retried on failure) is essential.

Please continue to elaborate. Maybe Flink is not the correct solution for this type of operation.

for (JetStreamSourceReaderSplit srSplit : splitMap.values()) {
JetStreamSourceReaderSplit.Snapshot snapshot = srSplit.removeSnapshot(checkpointId);
if (snapshot != null && srSplit.split.subjectConfig.ackMode) {
if (snapshot != null && srSplit.split.subjectConfig.ackPolicy.equals(AckPolicy.All)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this ack for Explicit? Explicit will need an entirely different handler, will need to either ack every time a message comes in, or maintain a list of messages to ack.

Copy link

@tilakraj9560 tilakraj9560 Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. that's the idea. The source shouldn’t perform the ack; instead, a downstream operator in the processing chain should handle it. If that operator doesn’t acknowledge the message, it will be automatically redelivered.

#67 (comment)

Copy link
Collaborator

@scottf scottf Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes a lot of assumptions and a message could be redelivered. Where is this "downstream", is it some sink that takes a message or data with the reply-to and the source is built accordingly.

How is the checkpoint supposed to track this?

I'm still waiting for a real use case that explains this that makes sense to implement in flink. From what I understand, you want the distributed-ness of flink nodes and parallelization, but then you want to do things outside of flink.
What if you just built normal apps and scale them horizontally? Do you have some non-NATS source or sink in the process.

I'm just not yet convinced of this behavior being added to the codebase without building in the ack solution.

Copy link

@tilakraj9560 tilakraj9560 Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I refer to downstream, I mean everything that follows the source; the flink operators responsible for transforming or processing data before reaching the sink.

“How is the checkpoint supposed to track this?”

In the current main branch implementation, the source tracks and acknowledges only the last emitted sequence number during checkpoints using AckPolicy.All. For example, if messages 5 through 10 are processed and checkpointed at 10, and we later resume processing and reach message 12, recovery would reprocess messages 11 and 12. This is expected and aligns with standard fault-tolerance behavior.

Now, Please consider a case where message 7 fails during processing between messages 5 and 10. Since the source has already acknowledged up to 10, this failure will go undetected by NATS. The assumption here is that flink operators are responsible for retaining and recovering such messages.

With AckPolicy.Explicit, the behavior remains similar.

In this mode, a flink operator is responsible for explicitly acknowledging each message. If certain messages fail and remain unacknowledged, NATS ensures reliability by redelivering them, up to the configured unacknowledged message limit.

If integrating this with Flink’s checkpointing feels unnecessarily complex, a straightforward solution would be to disable checkpointing when using explicit acknowledgments.

I'm still waiting for a real use case that explains this that makes sense to implement in flink. From what I understand, you want the distributed-ness of flink nodes and parallelization, but then you want to do things outside of flink.

All of this is happening within the Flink operators; nothing occurs after the sink, as it simply acts as an output destination.

What if you just built normal apps and scale them horizontally? Do you have some non-NATS source or sink in the process.

We’re using a NATS source, and the sink is non-NATS sink.

@tilakraj9560
Copy link

@scottf Thank you for reviewing the PR. I have resolved all the comments. I agree that switching the ack policy from none to explicit will introduce some performance overhead. However, there are valid use cases for this; such as long-running workflows where message ordering isn’t critical, but ensuring each message is successfully processed (or retried on failure) is essential.

Please continue to elaborate. Maybe Flink is not the correct solution for this type of operation.

Flink enables parallel data processing through operators with DAGs, where data flows from sources to sinks. In certain scenarios, it’s preferable not to acknowledge (ack) messages immediately at the source. Instead, the ack should be deferred until a specific operator within the DAG completes processing.

To handle such cases, an explicit acknowledgment mechanism is needed; where the source holds off on acking, and a downstream operator sends the acknowledgment by publishing it to the reply_to subject. If the operator doesn’t ack within a specified timeout, the message is considered unprocessed and is redelivered for retry.

@scottf
Copy link
Collaborator

scottf commented Jul 1, 2025

In certain scenarios, it’s preferable not to acknowledge (ack) messages immediately at the source.
What scenarios? I think you mean do not acknowledge until a sink processes the message.

The more and more you point this out, the more I don't think this is how a flink source should operate. It is self contained. Its job is to make sure it outputs each item, in our case a message. That's it. This is why I think that there should never be any acking, as long as the source tracks what messages it has handled and can restart.

Is there an example of some flink sources and sinks that do what you are trying to do?

It seems to me that you just want to confirm a message has been processed by the sink. Fine, but that's a source issue.

So do this. Write your own SourceConverter to output things like the message, headers and the message metadata. Your sink can be connected to any backend. Maybe you record knowledge of receipt of the message. And then record the processing or that message being completed... to a cache, another stream, a database, or even delete it from the stream directly via JetStreamManagement boolean deleteMessage(String streamName, long seq)

@tilakraj9560
Copy link

You’re right, the simplicity and self-contained nature of the source is a good design choice, and I really respect that perspective. That said, I believe there’s room to build on that foundation. By optionally enabling acknowledgment, we’re not altering the core logic but simply extending its usefulness for more complex or mission-critical use cases. It adds a layer of flexibility and observability that can be incredibly valuable, especially in environments where confirmation of processing is essential. It’s a way to make the connector more flexible and observable, while still preserving the clean experience for users who don’t need that level of control.

@scottf
Copy link
Collaborator

scottf commented Jul 4, 2025

How about this? Change the PR to accept different modes via an enum i.e. AckBehavior:

  1. NoAck same as ackMode(false) (ordered consumer)
  2. AckAll same as ackMode(true) (regular consumer with AckPolicy.All)
  3. AllButDoNotAck (regular consumer with AckPolicy.All)
  4. ExplicitButDoNotAck (regular consumer with AckPolicy.Explicit)

The default is NoAck Please comment the enum well. The api must remain backward compatible, so map

  • ackMode(false) to NoAck
  • ackMode(true) to AckAll

Please update the comment around the old api and deprecate similar to this

/**
 * @deprecated Use {@link #ackBehavior(AckBehavior)} instead.
 * Blah blah blah
 * @param ...
 * @return blah blah
 */
@Deprecated
public Builder ackMode(boolean ackMode);

@tilakraj94 tilakraj94 requested a review from scottf July 8, 2025 17:09
@tilakraj94
Copy link
Collaborator Author

I’ve made the necessary changes and added support for AckBehavior. Thank you for taking the time to review and guide me through the PR.

Please do let me know if there’s anything else I should address regarding comments or the API.

Really appreciate your support! 😄

@tilakraj94 tilakraj94 changed the title Refactor ackMode to AckPolicy Refactor ackMode to AckBehavior Jul 8, 2025
scottf
scottf previously requested changes Jul 8, 2025
Copy link
Collaborator

@scottf scottf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, some minor changes, no functional changes though.
One other note. I went back and read the PR top comment and saw

uses durable consumers

But it does not use durable consumers. See JetStreamSourceReader.createConsumer

So 2 things in regard to that.

  1. Update your comment.
  2. Maybe we can have another discussion about that, but I don't see the need for durable consumers because we'd have to clean them up, and they aren't necessary on a failure since we track the last sourced stream sequence and start over at seq + 1

@tilakraj94
Copy link
Collaborator Author

Looks good overall, some minor changes, no functional changes though. One other note. I went back and read the PR top comment and saw

uses durable consumers

But it does not use durable consumers. See JetStreamSourceReader.createConsumer

So 2 things in regard to that.

1. Update your comment.

2. Maybe we can have another discussion about that, but I don't see the need for durable consumers because we'd have to clean them up, and they aren't necessary on a failure since we track the last sourced stream sequence and start over at seq + 1

My mistake; you’re right, they aren’t durable consumers. I’ve updated the description and comments accordingly.

@scottf scottf dismissed their stale review July 9, 2025 19:31

Changes done.

Copy link
Collaborator

@scottf scottf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@scottf scottf merged commit 43cb642 into synadia-io:main Jul 9, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Work queue doesn't support AckPolicy.All

3 participants