Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

So I use the debezium key.field.name in my MySQL source connector to add a field into my topic.

The message looks below after landing on topic.

{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}:{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"[email protected]"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}

Where in, key is

{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}

and value is

{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"[email protected]"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}

As part of my sink hdfsSinkConnector I need to fetch the message key "__PKtableowner":"reviewDB.review.search_user_02 as part of a column or field in hdfs or hive.

The only SMT I found is ValueToKey, but it seems it didn't fit my use case because it's fetching from the value and not from the message key. I've tried (InsertField, CreateKey, ExtractField, etc.) Almost all of the transformation you can find here but no luck. https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html

I'm looking for a KeyToValue kind of SMT or if there are other workaround.

Below are my source and sink configurations. Source:

{
  "name": "REVIEW__MYSQL__search_user__source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.history.kafka.topic": "review.search_user_logs",
    "database.history.consumer.max.block.ms": "3000",
    "include.schema.changes": "false",
    "database.history.consumer.session.timeout.ms": "30000",
    "database.history.kafka.consumer.group": "compose-connect-group",
    "snapshot.new.tables": "parallel",
    "database.history.kafka.sasl.mechanism": "GSSAPI",
    "database.whitelist": "review",
    "database.history.producer.sasl.mechanism": "GSSAPI",
    "database.user": "root",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "time.precision.mode": "connect",
    "database.server.name": "reviewDB",
    "database.port": "3306",
    "database.history.consumer.heartbeat.interval.ms": "1000",
    "min.row.count.to.stream.results": "0",
    "database.hostname": "mysql",
    "database.password": "example",
    "database.history.consumer.sasl.mechanism": "GSSAPI",
    "snapshot.mode": "when_needed",
    "table.whitelist": "review.search_user_(.*)",
    "transforms": "Reroute",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "reviewDB.review.search_user_(.*)",
    "transforms.Reroute.topic.replacement": "search_user_all_shards",
    "transforms.Reroute.key.field.name": "__PKtableowner"
  }
}

Sink

{ "name": "REVIEW__MYSQL__search_user__sink",
  "config":
  {
      "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
      "topics.dir": "/_incr_files",
      "flush.size": 1,
      "tasks.max": 1,
      "timezone": "UTC",
      "rotate.interval.ms": 5000,
      "locale": "en",
      "hadoop.home": "/etc/hadoop",
      "logs.dir": "/_incr_files_wal",
      "hive.integration": "false",
      "partition.duration.ms": "20000",
      "hadoop.conf.dir": "/etc/hadoop",
      "topics": "search_user_all_shards",
      "hdfs.url": "hdfs://namenode:9000",
      "transforms": "unwrap,insertTopicOffset,insertTimeStamp",
      "transforms.insertTimeStamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.unwrap.drop.tombstones": "true",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.delete.handling.mode": "rewrite",
      "transforms.insertTimeStamp.timestamp.field": "spdb_landing_timestamp",
      "transforms.insertTopicOffset.offset.field": "spdb_topic_offset",
      "transforms.insertTopicOffset.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "schema.compatibility": "NONE",
      "path.format": "'partition'=YYYY-MM-dd-HH",
      "partitioner.class": "io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner"
  }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
1.2k views
Welcome To Ask or Share your Answers For Others

1 Answer

Since your key is a Struct, the best way I'm aware of is this SMT that effectively wraps the key and value into a new, nested value

https://github.com/jcustenborder/kafka-connect-transform-archive


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...