我需要使用confluent/kafka将Server数据库从on位置迁移到GoogleCloud。
我确实有一个源debezium连接器
{
"name": "mssql_src",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "source_dbname.dbo(.*)",
"transforms.Reroute.topic.replacement": "target_dbname$1"
}
重路由转换不适用于解包的连接,我仍然获得source_dbname.dbo.*主题而不是target_dbname.*
我需要插入数据到target_dbname数据库jdbc同步连接器有以下配置
{
"name": "mssql_trg",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "source_dbname.dbo.*",
"table.name.format": "${topic}",
"connection.url": "jdbc:sqlserver://xxx.xxx.xxx.xxx:1433;DatabaseName=rocketlawyer3",
"connection.user": "sqlserver",
"connection.password": "sqlserver",
"dialect.name": "SqlServerDatabaseDialect",
"insert.mode": "upsert",
"auto.create": true,
"auto.evolve": true,
"pk.mode": "record_value"
}
显然,它失败了,因为所有SQL操作都将表引用为source_database_name.dbo.table_name。
以下是两个问题:
发布于 2020-12-11 23:38:31
您需要使用
链式变换
。只需将
unwrap
和
Rerout
SMT组合在一起:
{
"name": "mssql_src",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "unwrap, Reroute",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",