My use case: Given, one JMS queue, one Kafka topic with two partitions, route messages from JMS queue to a particular partition of the Kafka topic based on custom header attribute's value.
Problem: Messages are getting routed from JMS to Kafka Topic's partition 0, not to partition 1 even though PARTITION_KEY and KEY are set in headers (code below).
Dependencies with version 2.16.3 used, steps and java code attempted is below:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId> <!--**2.16.3** used-->
</dependency>
<dependency>
<groupId>org.apache.camel</groupId> <!--**2.16.3** used-->
<artifactId>camel-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId> <!--**2.16.3** used-->
<artifactId>camel-core</artifactId>
</dependency>
- Created a Kafka topic with two partitions:
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:218 1 --replication-factor 1 --partitions 2 --topic mytopic-with-partitions
- Listen to messages on Partition 0:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic-with-partitions --partition 0
- Listen to messages on Partition 1:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic-with-partitions --partition 1
public class MyRouteBuilder extends RouteBuilder
{
private final MessagePreprocessor messagePreProcessor = new MessagePreprocessor();
@Override
public void configure() throws Exception
{
from("jms:queue:my-jms-queue").process(messagePreProcessor)
.log("Routing message from my-jms-queue to " + toKafka)
.to("kafka:localhost:9092?mytopic-with-partitions&zookeeperHost=localhost&zookeeperPort=2181&serializerClass=kafka.serializer.StringEncoder");
}
}
=============================
public class MessagePreprocessor implements Processor
{
@Override
public void process(final Exchange exchange)
{
final String body = exchange.getIn().getBody(String.class);
exchange.getOut().setBody(body);
Integer partitionToUse = 0;
final String flavorGot = (String) exchange.getIn().getHeader("FLAVOR");
if ("VANILLA".equals(flavorGot)) {
partitionToUse = 0;
} else {
partitionToUse = 1;
}
headers.put(KafkaConstants.PARTITION_KEY, partitionToUse);
headers.put(KafkaConstants.KEY, flavorGot);
exchange.getOut().setHeaders(headers);
}
}
Output:
- Messages are making it to Partition 0, displayed by command in Step 2
- Messages are NOT making it to Partition 1, displayed by command in Step 3.
Appreciate any suggestions/pointers. Thanks