-
Notifications
You must be signed in to change notification settings - Fork 6
Refactor ackMode to AckBehavior #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I did this review and was onboard at first. But I don't see anywhere that you handle Explicit. It cannot be handled like all.
Explicit is going to be brutally slow. I think this is a terrible idea. I still have not seen any note from you on actual use cases where this is important, but I guess it's not a problem to allow it.
src/examples/java/io/synadia/flink/examples/JetStreamExample.java
Outdated
Show resolved
Hide resolved
src/examples/java/io/synadia/flink/examples/JetStreamExampleFromConfigFiles.java
Outdated
Show resolved
Hide resolved
src/main/java/io/synadia/flink/source/JetStreamSubjectConfiguration.java
Outdated
Show resolved
Hide resolved
src/main/java/io/synadia/flink/source/JetStreamSubjectConfiguration.java
Outdated
Show resolved
Hide resolved
src/main/java/io/synadia/flink/source/JetStreamSubjectConfiguration.java
Outdated
Show resolved
Hide resolved
|
@scottf Thank you for reviewing the PR. I have resolved all the comments. I agree that switching the ack policy from none to explicit will introduce some performance overhead. However, there are valid use cases for this; such as long-running workflows where message ordering isn’t critical, but ensuring each message is successfully processed (or retried on failure) is essential. |
Please continue to elaborate. Maybe Flink is not the correct solution for this type of operation. |
src/main/java/io/synadia/flink/source/JetStreamSubjectConfiguration.java
Outdated
Show resolved
Hide resolved
src/examples/java/io/synadia/flink/examples/JetStreamExample.java
Outdated
Show resolved
Hide resolved
src/main/java/io/synadia/flink/source/JetStreamSubjectConfiguration.java
Show resolved
Hide resolved
| for (JetStreamSourceReaderSplit srSplit : splitMap.values()) { | ||
| JetStreamSourceReaderSplit.Snapshot snapshot = srSplit.removeSnapshot(checkpointId); | ||
| if (snapshot != null && srSplit.split.subjectConfig.ackMode) { | ||
| if (snapshot != null && srSplit.split.subjectConfig.ackPolicy.equals(AckPolicy.All)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this ack for Explicit? Explicit will need an entirely different handler, will need to either ack every time a message comes in, or maintain a list of messages to ack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. that's the idea. The source shouldn’t perform the ack; instead, a downstream operator in the processing chain should handle it. If that operator doesn’t acknowledge the message, it will be automatically redelivered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes a lot of assumptions and a message could be redelivered. Where is this "downstream", is it some sink that takes a message or data with the reply-to and the source is built accordingly.
How is the checkpoint supposed to track this?
I'm still waiting for a real use case that explains this that makes sense to implement in flink. From what I understand, you want the distributed-ness of flink nodes and parallelization, but then you want to do things outside of flink.
What if you just built normal apps and scale them horizontally? Do you have some non-NATS source or sink in the process.
I'm just not yet convinced of this behavior being added to the codebase without building in the ack solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I refer to downstream, I mean everything that follows the source; the flink operators responsible for transforming or processing data before reaching the sink.
“How is the checkpoint supposed to track this?”
In the current main branch implementation, the source tracks and acknowledges only the last emitted sequence number during checkpoints using AckPolicy.All. For example, if messages 5 through 10 are processed and checkpointed at 10, and we later resume processing and reach message 12, recovery would reprocess messages 11 and 12. This is expected and aligns with standard fault-tolerance behavior.
Now, Please consider a case where message 7 fails during processing between messages 5 and 10. Since the source has already acknowledged up to 10, this failure will go undetected by NATS. The assumption here is that flink operators are responsible for retaining and recovering such messages.
With AckPolicy.Explicit, the behavior remains similar.
In this mode, a flink operator is responsible for explicitly acknowledging each message. If certain messages fail and remain unacknowledged, NATS ensures reliability by redelivering them, up to the configured unacknowledged message limit.
If integrating this with Flink’s checkpointing feels unnecessarily complex, a straightforward solution would be to disable checkpointing when using explicit acknowledgments.
I'm still waiting for a real use case that explains this that makes sense to implement in flink. From what I understand, you want the distributed-ness of flink nodes and parallelization, but then you want to do things outside of flink.
All of this is happening within the Flink operators; nothing occurs after the sink, as it simply acts as an output destination.
What if you just built normal apps and scale them horizontally? Do you have some non-NATS source or sink in the process.
We’re using a NATS source, and the sink is non-NATS sink.
src/test/java/io/synadia/flink/source/JetStreamSourceBuilderTest.java
Outdated
Show resolved
Hide resolved
Flink enables parallel data processing through operators with DAGs, where data flows from sources to sinks. In certain scenarios, it’s preferable not to acknowledge (ack) messages immediately at the source. Instead, the ack should be deferred until a specific operator within the DAG completes processing. To handle such cases, an explicit acknowledgment mechanism is needed; where the source holds off on acking, and a downstream operator sends the acknowledgment by publishing it to the reply_to subject. If the operator doesn’t ack within a specified timeout, the message is considered unprocessed and is redelivered for retry. |
The more and more you point this out, the more I don't think this is how a flink source should operate. It is self contained. Its job is to make sure it outputs each item, in our case a message. That's it. This is why I think that there should never be any acking, as long as the source tracks what messages it has handled and can restart. Is there an example of some flink sources and sinks that do what you are trying to do? It seems to me that you just want to confirm a message has been processed by the sink. Fine, but that's a source issue. So do this. Write your own SourceConverter to output things like the message, headers and the message metadata. Your sink can be connected to any backend. Maybe you record knowledge of receipt of the message. And then record the processing or that message being completed... to a cache, another stream, a database, or even delete it from the stream directly via JetStreamManagement |
|
You’re right, the simplicity and self-contained nature of the source is a good design choice, and I really respect that perspective. That said, I believe there’s room to build on that foundation. By optionally enabling acknowledgment, we’re not altering the core logic but simply extending its usefulness for more complex or mission-critical use cases. It adds a layer of flexibility and observability that can be incredibly valuable, especially in environments where confirmation of processing is essential. It’s a way to make the connector more flexible and observable, while still preserving the clean experience for users who don’t need that level of control. |
|
How about this? Change the PR to accept different modes via an enum i.e.
The default is
Please update the comment around the old api and deprecate similar to this |
|
I’ve made the necessary changes and added support for AckBehavior. Thank you for taking the time to review and guide me through the PR. Please do let me know if there’s anything else I should address regarding comments or the API. Really appreciate your support! 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, some minor changes, no functional changes though.
One other note. I went back and read the PR top comment and saw
uses durable consumers
But it does not use durable consumers. See JetStreamSourceReader.createConsumer
So 2 things in regard to that.
- Update your comment.
- Maybe we can have another discussion about that, but I don't see the need for durable consumers because we'd have to clean them up, and they aren't necessary on a failure since we track the last sourced stream sequence and start over at seq + 1
src/main/java/io/synadia/flink/source/reader/JetStreamSourceReader.java
Outdated
Show resolved
Hide resolved
src/main/java/io/synadia/flink/source/reader/JetStreamSourceReader.java
Outdated
Show resolved
Hide resolved
src/main/java/io/synadia/flink/source/reader/JetStreamSourceReader.java
Outdated
Show resolved
Hide resolved
src/examples/java/io/synadia/flink/examples/support/AckMapFunction.java
Outdated
Show resolved
Hide resolved
src/examples/java/io/synadia/flink/examples/support/ExampleSourceConnector.java
Outdated
Show resolved
Hide resolved
src/examples/java/io/synadia/flink/examples/support/ExampleSourceConnector.java
Outdated
Show resolved
Hide resolved
My mistake; you’re right, they aren’t durable consumers. I’ve updated the description and comments accordingly. |
scottf
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR introduces a more expressive and flexible acknowledgment configuration for the Flink-NATS integration by replacing the legacy boolean ackMode with a strongly-typed AckBehavior. This change enables finer-grained control over message acknowledgment semantics, while preserving existing behaviors and adding support for new use cases.
Changes Made
Replaced the ackMode boolean with a new enum-style AckBehavior supporting four distinct modes:
Backward Compatibility
Problem Solved
Supports advanced scenarios where applications want full control over acknowledgment logic.
Fixes: #66