-
Notifications
You must be signed in to change notification settings - Fork 1k
Fix the FileBasedDeadLetterQueueReconsumer dup issue #2476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2476 +/- ##
==========================================
Coverage 49.62% 49.62%
- Complexity 4808 5157 +349
==========================================
Files 941 941
Lines 57661 57675 +14
Branches 6233 6233
==========================================
+ Hits 28614 28623 +9
- Misses 27007 27010 +3
- Partials 2040 2042 +2
🚀 New features to boost your workflow:
|
|
||
PCollection<ResourceId> movedFiles = | ||
input | ||
.apply("MoveFiles", ParDo.of(new MoveFiles())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change introduces the possibility of data loss which is worse. Specifically, we could run into the following scenario:
- Bundle x contains
file-1
,file-2
,file-3
file-1
is successfully renamed totmp-file-1
, but the bundle fails on renamingfile-2
file-1
is now orphaned as tmp-file-1
. I guess the way it is written, we would actually get recurrent errors which is a little better, but still not ideal.
Instead, I'd propose the following tweak:
- We get a timestamp
FOO
associated with each generate sequence firing. We already get this for free, we'd just need to propogate it through - Instead of renaming the file
tmp-<file name>
, we rename ittmp-FOO-<file name>
- Instead of only looking for
<file name>
when doing the rename operation, we look for<file name>
and rename it OR if<file name>
is not present, we look fortmp-FOO-<file name>
and use that instead. Iftmp-FOO-<file name>
and<file name>
are both not present, we log and move on, assuming that something else has claimed the file.
So the algorithm would be:
TriggerConsumeDLQ, AsFilePattern, MatchFiles, <new extract_ts function> -> (ts, file) tuples
in MoveFiles:
renamed_file_name = "tmp-${ts}-${original_file_name}"
if exists(renamed_file_name):
return
if !exists(original_file_name):
log.warning('skipping, handled by different pass')
mv(original_file_name, renamed_file_name)
Then the rest of the logic would stay the same.
ParDo.of(new MoveAndConsumeFn(fileContents, fileMetadata)) | ||
.withOutputTags(fileContents, TupleTagList.of(fileMetadata))); | ||
|
||
results | ||
.get(fileMetadata) | ||
.setCoder(MetadataCoder.of()) | ||
.setCoder(ResourceIdCoder.of()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is inevitable, but we are breaking update compatibility. Maybe worth calling out in the PR title so that it gets into the resource notes.
Fixes #2236
This PR changes the process from a "match -> read -> delete" sequence to a "match -> atomic rename -> read -> delete" sequence.
MoveFiles:
A new MoveFiles step is introduced, which renames each matched dead-letter file by adding a tmp- prefix to its name. his atomic "move" operation effectively claims the file, ensuring that subsequent scans won't find the original file and attempt to process it again.
After the files are moved, a Reshuffle transform is applied to guarantee that the rename operation is completed.