这是indexloc提供的服务,不要输入任何密码
Skip to content
This repository was archived by the owner on Sep 2, 2024. It is now read-only.

add initial offset to kafka source spec #779

Merged
merged 8 commits into from
Aug 12, 2021

Conversation

itsmurugappan
Copy link
Contributor

@itsmurugappan itsmurugappan commented Jul 26, 2021

Fixes #289

Proposed Changes

  • Adds initial offset to the kafkasource spec

Release Note

🎁  - Provide an option for the user to specify the initial offset for a consumer group in Kafka source, this field is honored only if there are no prior offsets committed for the consumer group.

/cc @lionelvillard

@google-cla google-cla bot added the cla: yes Indicates the PR's author has signed the CLA. label Jul 26, 2021
@knative-prow-robot knative-prow-robot added the size/M Denotes a PR that changes 30-99 lines, ignoring generated files. label Jul 26, 2021
@codecov
Copy link

codecov bot commented Jul 26, 2021

Codecov Report

Merging #779 (a86f128) into main (c1d2251) will increase coverage by 0.36%.
The diff coverage is 90.90%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #779      +/-   ##
==========================================
+ Coverage   75.62%   75.98%   +0.36%     
==========================================
  Files         142      144       +2     
  Lines        6445     6518      +73     
==========================================
+ Hits         4874     4953      +79     
+ Misses       1350     1337      -13     
- Partials      221      228       +7     
Impacted Files Coverage Δ
pkg/apis/sources/v1beta1/kafka_types.go 66.66% <ø> (ø)
pkg/common/client/config.go 88.61% <75.00%> (-0.95%) ⬇️
pkg/apis/sources/v1beta1/kafka_defaults.go 100.00% <100.00%> (ø)
pkg/apis/sources/v1beta1/kafka_validation.go 76.66% <100.00%> (+3.58%) ⬆️
pkg/source/client/client.go 57.64% <100.00%> (+41.98%) ⬆️
...rce/reconciler/source/resources/receive_adapter.go 100.00% <100.00%> (ø)
test/rekt/resources/kafkachannel/kafkachannel.go 26.66% <0.00%> (ø)
test/rekt/resources/resetoffset/resetoffset.go 42.30% <0.00%> (ø)
...el/distributed/controller/kafkachannel/receiver.go 89.66% <0.00%> (+0.03%) ⬆️
.../distributed/controller/kafkachannel/dispatcher.go 90.25% <0.00%> (+0.08%) ⬆️
... and 2 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c1d2251...a86f128. Read the comment docs.

@knative-prow-robot knative-prow-robot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels Jul 26, 2021
@@ -38,6 +39,7 @@ func TestMakeReceiveAdapter(t *testing.T) {
Spec: v1beta1.KafkaSourceSpec{
Topics: []string{"topic1,topic2"},
ConsumerGroup: "group",
InitialOffset: ptr.Int64(-2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's -2 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

earliest, i will change to use the constants from sarama library

@matzew
Copy link
Contributor

matzew commented Jul 27, 2021

Isn't the official kafka (jvm) client just doing earliest and latest ?

@@ -72,6 +72,10 @@ type KafkaSourceSpec struct {
// +optional
ConsumerGroup string `json:"consumerGroup,omitempty"`

// InitialOffset is the Initial Offset for the consumer group.
// +optional
InitialOffset *int64 `json:"initialOffset,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use sarama specific things in the API.

can we use earliest or latest as string?

/hold

Copy link
Contributor Author

@itsmurugappan itsmurugappan Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to give a specific offset in that case ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to set a single offset for all partitions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, let me ask another question, as of now, if I set 50 here, what should happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I had a wrong assumption that we could also give a specific offset.
But looks like it needs to be earliest or latest. I will change the API to make the initial offset a string that will accept "earliest" or "latest"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have made the changes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

/unhold

@knative-prow-robot knative-prow-robot added the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Jul 27, 2021
Comment on lines 77 to 78
// +optional
InitialOffset string `json:"initialOffset,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is optional should it be a pointer *string?

@@ -72,6 +72,10 @@ type KafkaSourceSpec struct {
// +optional
ConsumerGroup string `json:"consumerGroup,omitempty"`

// InitialOffset is the Initial Offset for the consumer group.
// +optional
InitialOffset *int64 `json:"initialOffset,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

/unhold

@knative-prow-robot knative-prow-robot removed the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Jul 27, 2021
@lionelvillard
Copy link
Contributor

thanks @itsmurugappan!

Can you also update the documentation (if not done already), or a least open an issue in knative/docs?

/approve

@pierDipi @matzew for LGTM...

@knative-prow-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: itsmurugappan, lionelvillard

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow-robot knative-prow-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jul 29, 2021
@itsmurugappan
Copy link
Contributor Author

docs pr -> knative/docs#4080

@itsmurugappan
Copy link
Contributor Author

/test pull-knative-sandbox-eventing-kafka-integration-test-channel-consolidated

@matzew
Copy link
Contributor

matzew commented Jul 30, 2021

/test all

@lionelvillard
Copy link
Contributor

firs time I see this flake in the scheduler. I'll take a look when I get a chance

/test pull-knative-sandbox-eventing-kafka-unit-tests

@lionelvillard
Copy link
Contributor

@matzew are you ok with this?

@pierDipi
Copy link
Member

pierDipi commented Aug 4, 2021

@knative-metrics-robot
Copy link

The following is the coverage report on the affected files.
Say /test pull-knative-sandbox-eventing-kafka-go-coverage to re-run this coverage report

File Old Coverage New Coverage Delta
pkg/apis/sources/v1beta1/kafka_validation.go 90.0% 90.9% 0.9
pkg/common/client/config.go 91.7% 91.3% -0.4
pkg/source/client/client.go 19.7% 59.0% 39.3

@lionelvillard
Copy link
Contributor

/lgtm

@knative-prow-robot knative-prow-robot added the lgtm Indicates that a PR is ready to be merged. label Aug 12, 2021
@knative-prow-robot knative-prow-robot merged commit fdf11c5 into knative-extensions:main Aug 12, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cla: yes Indicates the PR's author has signed the CLA. lgtm Indicates that a PR is ready to be merged. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Provide option for configuring initial offset in kafka source
6 participants