这是indexloc提供的服务,不要输入任何密码
Skip to content

Fix the FileBasedDeadLetterQueueReconsumer dup issue #2476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

liferoad
Copy link
Contributor

@liferoad liferoad commented Jun 22, 2025

Fixes #2236

This PR changes the process from a "match -> read -> delete" sequence to a "match -> atomic rename -> read -> delete" sequence.

MoveFiles:

A new MoveFiles step is introduced, which renames each matched dead-letter file by adding a tmp- prefix to its name. his atomic "move" operation effectively claims the file, ensuring that subsequent scans won't find the original file and attempt to process it again.

After the files are moved, a Reshuffle transform is applied to guarantee that the rename operation is completed.

Copy link

codecov bot commented Jun 22, 2025

Codecov Report

Attention: Patch coverage is 88.46154% with 3 lines in your changes missing coverage. Please review.

Project coverage is 49.62%. Comparing base (43be033) to head (d893e98).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...v2/cdc/dlq/FileBasedDeadLetterQueueReconsumer.java 88.46% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##               main    #2476    +/-   ##
==========================================
  Coverage     49.62%   49.62%            
- Complexity     4808     5157   +349     
==========================================
  Files           941      941            
  Lines         57661    57675    +14     
  Branches       6233     6233            
==========================================
+ Hits          28614    28623     +9     
- Misses        27007    27010     +3     
- Partials       2040     2042     +2     
Components Coverage Δ
spanner-templates 69.94% <ø> (-0.01%) ⬇️
spanner-import-export 68.61% <ø> (-0.03%) ⬇️
spanner-live-forward-migration 78.77% <ø> (ø)
spanner-live-reverse-replication 77.36% <ø> (ø)
spanner-bulk-migration 87.89% <ø> (ø)
Files with missing lines Coverage Δ
...v2/cdc/dlq/FileBasedDeadLetterQueueReconsumer.java 72.54% <88.46%> (+4.36%) ⬆️

... and 2 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@liferoad liferoad marked this pull request as ready for review July 12, 2025 19:54
@liferoad liferoad requested a review from damccorm July 12, 2025 19:54

PCollection<ResourceId> movedFiles =
input
.apply("MoveFiles", ParDo.of(new MoveFiles()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change introduces the possibility of data loss which is worse. Specifically, we could run into the following scenario:

  1. Bundle x contains file-1, file-2, file-3
  2. file-1 is successfully renamed to tmp-file-1, but the bundle fails on renaming file-2

file-1 is now orphaned as tmp-file-1. I guess the way it is written, we would actually get recurrent errors which is a little better, but still not ideal.

Instead, I'd propose the following tweak:

  1. We get a timestamp FOO associated with each generate sequence firing. We already get this for free, we'd just need to propogate it through
  2. Instead of renaming the file tmp-<file name>, we rename it tmp-FOO-<file name>
  3. Instead of only looking for <file name> when doing the rename operation, we look for <file name> and rename it OR if <file name> is not present, we look for tmp-FOO-<file name> and use that instead. If tmp-FOO-<file name> and <file name> are both not present, we log and move on, assuming that something else has claimed the file.

So the algorithm would be:

TriggerConsumeDLQ, AsFilePattern, MatchFiles, <new extract_ts function> -> (ts, file) tuples
in MoveFiles:
   renamed_file_name = "tmp-${ts}-${original_file_name}"
   if exists(renamed_file_name):
      return
   if !exists(original_file_name):
      log.warning('skipping, handled by different pass')
   mv(original_file_name, renamed_file_name)

Then the rest of the logic would stay the same.

ParDo.of(new MoveAndConsumeFn(fileContents, fileMetadata))
.withOutputTags(fileContents, TupleTagList.of(fileMetadata)));

results
.get(fileMetadata)
.setCoder(MetadataCoder.of())
.setCoder(ResourceIdCoder.of())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is inevitable, but we are breaking update compatibility. Maybe worth calling out in the PR title so that it gets into the resource notes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: FileBasedDeadLetterQueueReconsumer could result in duplicates
2 participants