Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,26 @@ public static Schema.FieldType beamSchemaTypeFromKafkaType(
}

public static Instant debeziumRecordInstant(SourceRecord record) {
if (!record.valueSchema().type().equals(org.apache.kafka.connect.data.Schema.Type.STRUCT)
|| record.valueSchema().field("ts_ms") == null) {
throw new IllegalArgumentException(
"Debezium record received is not of the right kind. "
+ String.format(
"Should be STRUCT with ts_ms field. Instead it is: %s", record.valueSchema()));
if (record.valueSchema() != null
&& record.valueSchema().type().equals(org.apache.kafka.connect.data.Schema.Type.STRUCT)
&& record.valueSchema().field("ts_ms") != null
&& record.value() != null) {
Struct recordValue = (Struct) record.value();
return Instant.ofEpochMilli(recordValue.getInt64("ts_ms"));
}
Struct recordValue = (Struct) record.value();
return Instant.ofEpochMilli(recordValue.getInt64("ts_ms"));

if (record.sourceOffset() != null && record.sourceOffset().containsKey("ts_usec")) {
Object tsUsecValue = record.sourceOffset().get("ts_usec");
if (tsUsecValue instanceof Number) {
return Instant.ofEpochMilli(((Number) tsUsecValue).longValue() / 1000);
}
}

throw new IllegalArgumentException(
"Debezium record received is not of the right kind. "
+ String.format(
"Should be STRUCT with ts_ms field or sourceOffset with ts_usec. Instead it is: %s, %s",
record.valueSchema(), record.sourceOffset()));
}

public static SourceRecordMapper<Row> beamRowFromSourceRecordFn(final Schema recordSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;

import java.util.Collections;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -59,11 +62,45 @@ public void testSimpleSourceRecordSchemaConversion() {

@Test
public void testTimestampRequired() {
org.apache.kafka.connect.source.SourceRecord record = SourceRecordJsonTest.buildSourceRecord();
SourceRecord record = SourceRecordJsonTest.buildSourceRecord();

IllegalArgumentException e =
assertThrows(
IllegalArgumentException.class, () -> KafkaConnectUtils.debeziumRecordInstant(record));
assertThat(e.getMessage(), Matchers.containsString("Should be STRUCT with ts_ms field"));
assertThat(
e.getMessage(),
Matchers.containsString("Should be STRUCT with ts_ms field or sourceOffset with ts_usec"));
}

@Test
public void testDebeziumRecordInstantNullValueSchema() {
SourceRecord record =
new SourceRecord(
Collections.singletonMap("server", "test"),
Collections.singletonMap("ts_usec", 1614854400000000L),
"test-topic",
null,
null);

Instant instant = KafkaConnectUtils.debeziumRecordInstant(record);
assertThat(instant.getMillis(), Matchers.is(1614854400000L));
}

@Test
public void testDebeziumRecordInstantMissingTimestamp() {
SourceRecord record =
new SourceRecord(
Collections.singletonMap("server", "test"),
Collections.emptyMap(),
"test-topic",
null,
null);

IllegalArgumentException e =
assertThrows(
IllegalArgumentException.class, () -> KafkaConnectUtils.debeziumRecordInstant(record));
assertThat(
e.getMessage(),
Matchers.containsString("Should be STRUCT with ts_ms field or sourceOffset with ts_usec"));
}
}
Loading