Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I've been working on updating a Flink processor (Flink version 1.9) that reads from Kafka and then writes to Kafka. We have written this processor to run towards a Kafka 0.10.2 cluster and now we have deployed a new Kafka cluster running version 2.2. Therefore I set out to update the processor to use the latest FlinkKafkaConsumer and FlinkKafkaProducer (as suggested by the Flink docs). However I've run into some problems with the Kafka producer. I'm unable to get it to Serialize data using deprecated constructors (not surprising) and I've been unable to find any implementations or examples online about how to implement a Serializer (all the examples are using older Kafka Connectors)

The current implementation (for Kafka 0.10.2) is as follows

FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                (FlinkKafkaPartitioner) null
        );

When trying to implement the following FlinkKafkaProducer

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                null
        );

I get the following error:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
    at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)

and I haven't been able to figure out why. The constructor for FlinkKafkaProducer is also deprecated and when I try implementing the non-deprecated constructor I can't figure out how to serialize the data. The following is how it would look:

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
                        return null;
                    }
                },
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

But I don't understand how to implement the KafkaSerializationSchema and I find no examples of this online or in the Flink docs.

Does anyone have any experience implementing this or any tips on why the FlinkProducer gets NullPointerException in the step?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
186 views
Welcome To Ask or Share your Answers For Others

1 Answer

If you are just sending String to Kafka:

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{

    private String topic;   

    public ProducerStringSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
        return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
    }

}

For sending a Java Object:

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;


    public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{

        private String topic;   
        private ObjectMapper mapper;

        public ObjSerializationSchema(String topic) {
            super();
            this.topic = topic;
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {
            byte[] b = null;
            if (mapper == null) {
                mapper = new ObjectMapper();
            }
             try {
                b= mapper.writeValueAsBytes(obj);
            } catch (JsonProcessingException e) {
                // TODO 
            }
            return new ProducerRecord<byte[], byte[]>(topic, b);
        }

    }

In your code

.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic), 
                        params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...