Skip to content

[FLINK-38928] Implement an operator for handling DO ERROR/NOTHING (#2…#27602

Open
dawidwys wants to merge 3 commits intoapache:masterfrom
dawidwys:flink38928-2
Open

[FLINK-38928] Implement an operator for handling DO ERROR/NOTHING (#2…#27602
dawidwys wants to merge 3 commits intoapache:masterfrom
dawidwys:flink38928-2

Conversation

@dawidwys
Copy link
Contributor

second attempt at #27502

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 13, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@dawidwys
Copy link
Contributor Author

@twalthr @pnowojski Could you take a look at this PR? I tried addressing @pnowojski concerns.

buffer.put(timestamp, records);

timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp);
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you explain in the comment how this +1 here and -1 in timer firing is supposed to work? I get it, but I think it would be good to explain it for the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember some code paths when we emit Long.MAX_VALUE on end_of_input; can we add a check that this increment doesn't result in overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a point in that. When you emit MAX_VALUE there won't be any new records anyhow afterwards. So there will be no records with timestamp MAX_VALUE and we don't care if a timer fires or not.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % I would like @rkhachatryan to also take a look here before merging.

I also presume we don't need a feature toggle for this one, as users would have to manually change the conflict resolution strategy in their schemas/tables for this change to take the effect. Right?

@dawidwys
Copy link
Contributor Author

I also presume we don't need a feature toggle for this one, as users would have to manually change the conflict resolution strategy in their schemas/tables for this change to take the effect. Right

Correct

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!
I've left some comments - PTAL (sorry if some questions were already asked on the PR).

Meta remark:
During our previous discussions around SinkUpsertMaterializer, my understanding was that we'll implement compaction on watermark on top of the existing implementation (be it sum v1 or v2).

I'm fine with adding a 3rd one, but I must say it complicates not only the code, but also the operations for the user.

On testing:
if the bugs I described are real, we should probably plug the existing testing code for SUM V1/2 - it was extended significantly for FLIP-544

Comment on lines +362 to +376
if (isErrorOrNothingConflictStrategy()) {
// Use input parallelism to preserve watermark semantics
transformForMaterializer =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
createTransformationMeta(
WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
"WatermarkTimestampAssigner",
"WatermarkTimestampAssigner",
config),
new WatermarkTimestampAssigner(),
inputTransform.getOutputType(),
inputTransform.getParallelism(),
false);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we validate somehow that the watermarks are generated at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan was to add it in a subsequent PR.

pkFieldNames);
}

// Use existing logic for DEDUPLICATE (legacy behavior)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we apply compaction on watermark to the default behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we?

Comment on lines +413 to +416
if (isErrorOrNothingConflictStrategy()) {
RowType keyType = RowTypeUtils.projectRowType(physicalRowType, primaryKeys);

return WatermarkCompactingSinkMaterializer.create(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this mode compatible with ReducingUpsertSink?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, why wouldn't it?


@Override
public void serialize(Long record, DataOutputView target) throws IOException {
target.writeLong(MathUtils.flipSignBit(record));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This places positive watermarks before negative ones, right? (because we flip + to -)

For example:

original: [9223372036854775807, -3, 0, -9223372036854775808, 3, 2, 1, -1, -2]

output:
0 [-128, 0, 0, 0, 0, 0, 0, 0]
1 [-128, 0, 0, 0, 0, 0, 0, 1]
2 [-128, 0, 0, 0, 0, 0, 0, 2]
3 [-128, 0, 0, 0, 0, 0, 0, 3]
9223372036854775807 [-1, -1, -1, -1, -1, -1, -1, -1]
-9223372036854775808 [0, 0, 0, 0, 0, 0, 0, 0]
-3 [127, -1, -1, -1, -1, -1, -1, -3]
-2 [127, -1, -1, -1, -1, -1, -1, -2]
-1 [127, -1, -1, -1, -1, -1, -1, -1]

The result is sorted using Arrays.sort(outs, Arrays::compare); where outs is byte[][] and holds serialization results.

This sounds like a bug or a potential bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting timers don't work in Flink? This is copied from org.apache.flink.streaming.api.operators.TimerSerializer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think timers may not work correctly in Flink for negative time (which we probably didn't have so far).

import java.util.Random;

/** A test for the {@link SortedLongSerializer}. */
class SortedLongSerializerTest extends SerializerTestBase<Long> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some sorting tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's copied from org.apache.flink.streaming.api.operators.TimerSerializer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know :)

Comment on lines +368 to +371
} else if (result.isEmpty()) {
if (previousValue != null) {
emit(previousValue, DELETE);
currentValue.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the next time we get +I, we'll emit it again. This makes sense, but I think is inconsistent with NOTHING strategy description:

Keep the first record that arrives for a given primary key, discard subsequent conflicts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is not. If there was a deletion it is deleted. There is a point in time, where there was NO value. NOTHING strategy is applied when there are two ACTIVE values at the same time.

if (previousValue != null) {
records.add(previousValue);
}
Iterator<Map.Entry<Long, List<RowData>>> iterator = buffer.entries().iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For every timer timestamp X, we should know exactly the time X-1 when the record was added, right?

Why do we need to iterate over the whole state here?
Can't we use point lookup (which is MUCH less expensive than iteration)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically you're correct. Still I'd say it's safer to iterate over the records. In a common scenario it should not matter much as there should not be many parallel watermarks flowing through a channel.

switch (pendingRecord.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
addRow(records, pendingRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is O(N), so the innermost loop is O(N^N).
Why don't we use a hashmap instead of linear findFirst?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit is copied over from SUM v1.

break;
case UPDATE_BEFORE:
case DELETE:
retractRow(records, pendingRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if +I and -D arrive within the same watermark then we won't emit anything (which makes sense).
But strategy NOTHING says that we'll emit the first arrived record - should we clarify that in the java/docs?

Copy link
Contributor Author

@dawidwys dawidwys Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that is true what you're saying.

NOTHING is applied when there is a conflict in active keys. If, as you said, a value is retracted, which means non-existing, no value is appropriate.


@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
element.setTimestamp(currentWatermark);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this timestamp emitted (to kafka or other sinks)?
If so, should we let the users know that the timestamp is adjusted in these modes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not.

@dawidwys
Copy link
Contributor Author

Meta remark:
During our previous discussions around SinkUpsertMaterializer, my understanding was that we'll implement compaction on watermark on top of the existing implementation (be it sum v1 or v2).
I'm fine with adding a 3rd one, but I must say it complicates not only the code, but also the operations for the user.

First time that I hear that. I can't find any such comments on the FLIP discussion. Moreover I can't think how that could be possible since we're changing the semantics slightly. Lastly adding watermark compaction to the existing SUM would not help with the state size. It still needs to keep the entire history.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants

Comments