这是indexloc提供的服务,不要输入任何密码
Skip to content

Add SQL Server support for Datastream replication events #2498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,18 @@ record = new ObjectMapper().readTree(c.element());
outputObject.put("_metadata_log_file", getSourceMetadata(record, "log_file"));
outputObject.put("_metadata_log_position", getSourceMetadataAsLong(record, "log_position"));
} else if (sourceType.equals("backfill") || sourceType.equals("cdc")) {
// MongoDB Specific Metadata, MongoDB has different structure for sourceType.
outputObject.put("_metadata_timestamp_seconds", getSecondsFromMongoSortKeys(record));
outputObject.put("_metadata_timestamp_nanos", getNanosFromMongoSortKeys(record));
// SQL Server (has replication_index) or MongoDB
JsonNode sourceMetadata = getSourceMetadata(record);
if (sourceMetadata != null && sourceMetadata.has("replication_index")) {
// SQL Server Specific Metadata
outputObject.put("_metadata_schema", getSourceMetadata(record, "schema"));
outputObject.put("_metadata_lsn", getSourceMetadata(record, "lsn"));
outputObject.put("_metadata_tx_id", getSourceMetadata(record, "tx_id"));
} else {
// MongoDB Specific Metadata, MongoDB has different structure for sourceType.
outputObject.put("_metadata_timestamp_seconds", getSecondsFromMongoSortKeys(record));
outputObject.put("_metadata_timestamp_nanos", getNanosFromMongoSortKeys(record));
}
} else {
// Oracle Specific Metadata
outputObject.put("_metadata_schema", getSourceMetadata(record, "schema"));
Expand Down Expand Up @@ -224,6 +233,10 @@ private JsonNode getPrimaryKeys(JsonNode record) {
return null;
}

// For SQL Server, primary keys are in replication_index
if (md.has("replication_index")) {
return md.get("replication_index");
}
return md.get("primary_keys");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,18 @@ public FailsafeElement<String, String> apply(GenericRecord record) {
outputObject.put("_metadata_lsn", getPostgresLsn(record));
outputObject.put("_metadata_tx_id", getPostgresTxId(record));
} else if (sourceType.equals("backfill") || sourceType.equals("cdc")) {
// MongoDB Specific Metadata, MongoDB has different structure for sourceType.
outputObject.put("_metadata_timestamp_seconds", getSecondsFromMongoSortKeys(record));
outputObject.put("_metadata_timestamp_nanos", getNanosFromMongoSortKeys(record));
// SQL Server (has replication_index) or MongoDB
JsonNode sourceMetadata = getSourceMetadataJson(record);
if (sourceMetadata != null && sourceMetadata.has("replication_index")) {
// SQL Server Specific Metadata
outputObject.put("_metadata_schema", getSourceMetadata(record, "schema"));
outputObject.put("_metadata_lsn", getSourceMetadata(record, "lsn"));
outputObject.put("_metadata_tx_id", getSourceMetadata(record, "tx_id"));
} else {
// MongoDB Specific Metadata, MongoDB has different structure for sourceType.
outputObject.put("_metadata_timestamp_seconds", getSecondsFromMongoSortKeys(record));
outputObject.put("_metadata_timestamp_nanos", getNanosFromMongoSortKeys(record));
}
} else {
// Oracle Specific Metadata
outputObject.put("_metadata_schema", getMetadataSchema(record));
Expand Down Expand Up @@ -272,6 +281,11 @@ private JsonNode getPrimaryKeys(GenericRecord record) {
}

JsonNode dataInput = getSourceMetadataJson(record);
// For SQL Server, primary keys are in replication_index
if (sourceMetadata.getSchema().getField("replication_index") != null) {
return dataInput.get("replication_index");
}

return dataInput.get("primary_keys");
}

Expand Down Expand Up @@ -663,6 +677,22 @@ static void handleDatastreamRecordType(
}
jsonObject.put(fieldName, convertedIntervalNano);
break;
case "datetime":
// Handle SQL Server datetime type - similar to MySQL datetime
// Convert to timestamp string using date (days since epoch) and time (microseconds since
// midnight)
Long totalMicros = TimeUnit.DAYS.toMicros(Long.valueOf(element.get("date").toString()));
totalMicros += Long.valueOf(element.get("time").toString());
Instant timestampInstant =
Instant.ofEpochSecond(
TimeUnit.MICROSECONDS.toSeconds(totalMicros),
TimeUnit.MICROSECONDS.toNanos(totalMicros % TimeUnit.SECONDS.toMicros(1)));
jsonObject.put(
fieldName,
timestampInstant
.atOffset(ZoneOffset.UTC)
.format(DEFAULT_TIMESTAMP_WITH_TZ_FORMATTER));
break;
default:
ObjectMapper mapper = new ObjectMapper();
JsonNode dataInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,15 @@ public List<String> getSortFields() {
return Arrays.asList("_metadata_timestamp", "_metadata_log_file", "_metadata_log_position");
} else if (this.getSourceType().equals("postgresql")) {
return Arrays.asList("_metadata_timestamp", "_metadata_lsn", "_metadata_uuid");
} else if (this.getSourceType().equals("backfill") || this.getSourceType().equals("cdc")) {
// Check if it's SQL Server by looking for replication_index in source metadata
String sourceMetadata = getStringValue("_metadata_source");
if (sourceMetadata != null && sourceMetadata.contains("replication_index")) {
// SQL Server detected - same sort fields as PostgreSQL
return Arrays.asList("_metadata_timestamp", "_metadata_lsn", "_metadata_uuid");
}
// Default for non-SQL Server backfill/cdc (e.g., MongoDB)
return Arrays.asList("_metadata_timestamp");
} else {
// Current default is oracle.
return Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,36 @@ public void testLogicalType_micros() {
assertTrue(jsonObject.get(fieldNamePositiveNumber).asText().equals("1981-11-21T11:45:11Z"));
}

@Test
public void testParseSqlServerRecords() throws IOException, URISyntaxException {
URL resource =
getClass()
.getClassLoader()
.getResource("FormatDatastreamRecordToJsonTest/sqlserver_test.avro");
File file = new File(resource.toURI());
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader);

// Test INSERT record (backfill)
GenericRecord record = dataFileReader.next();
String jsonData = FormatDatastreamRecordToJson.create().apply(record).getOriginalPayload();
ObjectMapper mapper = new ObjectMapper();
JsonNode changeEvent = mapper.readTree(jsonData);

// Verify SQL Server metadata fields
assertEquals("dbo", changeEvent.get("_metadata_schema").asText());
assertEquals("test_table", changeEvent.get("_metadata_table").asText());
assertEquals("00000027:00000130:0001", changeEvent.get("_metadata_lsn").asText());
assertEquals("", changeEvent.get("_metadata_tx_id").asText());
assertEquals("backfill", changeEvent.get("_metadata_read_method").asText());

// Verify payload
assertEquals(1, changeEvent.get("id").asInt());
assertEquals("Test User 1", changeEvent.get("name").asText());

dataFileReader.close();
}

@Test
public void testIntervalNano() throws JsonProcessingException {

Expand Down
Binary file not shown.