-
Notifications
You must be signed in to change notification settings - Fork 81
add initial offset to kafka source spec #779
add initial offset to kafka source spec #779
Conversation
Codecov Report
@@ Coverage Diff @@
## main #779 +/- ##
==========================================
+ Coverage 75.62% 75.98% +0.36%
==========================================
Files 142 144 +2
Lines 6445 6518 +73
==========================================
+ Hits 4874 4953 +79
+ Misses 1350 1337 -13
- Partials 221 228 +7
Continue to review full report at Codecov.
|
@@ -38,6 +39,7 @@ func TestMakeReceiveAdapter(t *testing.T) { | |||
Spec: v1beta1.KafkaSourceSpec{ | |||
Topics: []string{"topic1,topic2"}, | |||
ConsumerGroup: "group", | |||
InitialOffset: ptr.Int64(-2), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's -2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
earliest, i will change to use the constants from sarama library
Isn't the official kafka (jvm) client just doing |
@@ -72,6 +72,10 @@ type KafkaSourceSpec struct { | |||
// +optional | |||
ConsumerGroup string `json:"consumerGroup,omitempty"` | |||
|
|||
// InitialOffset is the Initial Offset for the consumer group. | |||
// +optional | |||
InitialOffset *int64 `json:"initialOffset,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not use sarama specific things in the API.
can we use earliest
or latest
as string?
/hold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how to give a specific offset in that case ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you plan to set a single offset for all partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, let me ask another question, as of now, if I set 50 here, what should happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I had a wrong assumption that we could also give a specific offset.
But looks like it needs to be earliest or latest. I will change the API to make the initial offset a string that will accept "earliest" or "latest"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have made the changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
/unhold
// +optional | ||
InitialOffset string `json:"initialOffset,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is optional should it be a pointer *string
?
@@ -72,6 +72,10 @@ type KafkaSourceSpec struct { | |||
// +optional | |||
ConsumerGroup string `json:"consumerGroup,omitempty"` | |||
|
|||
// InitialOffset is the Initial Offset for the consumer group. | |||
// +optional | |||
InitialOffset *int64 `json:"initialOffset,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
/unhold
thanks @itsmurugappan! Can you also update the documentation (if not done already), or a least open an issue in /approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: itsmurugappan, lionelvillard The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
docs pr -> knative/docs#4080 |
/test pull-knative-sandbox-eventing-kafka-integration-test-channel-consolidated |
/test all |
firs time I see this flake in the scheduler. I'll take a look when I get a chance /test pull-knative-sandbox-eventing-kafka-unit-tests |
@matzew are you ok with this? |
There is only one comment left #779 (comment) that still applies but at a different line https://github.com/knative-sandbox/eventing-kafka/pull/779/files#diff-aa0fe2380b2926d6809e7204b18c7c78f0d0158748106f0dca91a667ab700d21R88 |
The following is the coverage report on the affected files.
|
/lgtm |
Fixes #289
Proposed Changes
Release Note
/cc @lionelvillard