Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

Good morning

When I work with a HDFS2 connector sink, integrated with HIVE, the database table get the name of the topic. Is there a way to choice the name of the table?.

This is the config of my conector:

   "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
   "hive.integration": "true",
   "hive.database": "databaseEze",
   "hive.metastore.uris": "thrift://server1.dc.es.arioto:9083",
   "transforms.InsertField.timestamp.field": "carga",
   "flush.size": "100000",
   "tasks.max": "2",
   "timezone": "Europe/Paris",
   "transforms": "RenameField,InsertField,carga_format",
   "rotate.interval.ms": "900000",
   "locale": "en-GB",
   "logs.dir": "/logs",    
   "format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
   "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
   "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "transforms.RenameField.renames": "var1:Test1,var2:Test2,var3:test3",
   "transforms.carga_format.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
   "transforms.carga_format.target.type": "string",
   "transforms.carga_format.format": "yyyyMMdd",
   "hadoop.conf.dir": "/etc/hadoop/",
   "schema.compatibility": "BACKWARD",
   "topics": "Skiel-Tracking-Replicator",
   "hdfs.url": "hdfs://database/user/datavaseEze/",
   "transforms.InsertField.topic.field": "ds_topic",    
   "partition.field.name": "carga",
   "transforms.InsertField.partition.field": "test_partition",
   "value.converter.schema.registry.url": "http://schema-registry-eze-dev.ocjc.serv.dc.es.arioto",
   "partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
   "name": "KAFKA-HDFS-HIVE-TEST",
   "transforms.fx_carga_format.field": "carga",
   "transforms.InsertField.offset.field": "test_offset"
}

With that config, the table will name **Skiel-Tracking-Replicator** and I want that the table name will be d9nvtest.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
534 views
Welcome To Ask or Share your Answers For Others

1 Answer

You can use the RegexRouter Single Message Transform to modify the topic name.

{
"transforms"                        : "renameTopic",
"transforms.renameTopic.type"       : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.renameTopic.regex"      : "Skiel-Tracking-Replicator",
"transforms.renameTopic.replacement": "d9nvtest"
}

See https://rmoff.net/2020/12/11/twelve-days-of-smt-day-4-regexrouter/


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...