Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I'm using Apache Flink (v1.11) with Scala and added an own DeserializationSchema for Kafka connector. Therefore i would like to use my own packages and versions of jackson (v2.12.0).

But i got the following error:

Exception in thread "main" java.lang.VerifyError: Cannot inherit from final class
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at com.fasterxml.jackson.dataformat.csv.CsvMapper.<init>(CsvMapper.java:108)
    at de.integration_factory.datastream.types.CovidEventSchema.<init>(CovidEventSchema.scala:14)
    at de.integration_factory.datastream.Aggregate_Datastream$.main(Aggregate_Datastream.scala:34)
    at de.integration_factory.datastream.Aggregate_Datastream.main(Aggregate_Datastream.scala)

This is my EventSchema:

import com.fasterxml.jackson.dataformat.csv.CsvMapper
import com.fasterxml.jackson.datatype.joda.JodaModule
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation

@SerialVersionUID(6154188370181669758L)
class CovidEventSchema extends DeserializationSchema[CovidEvent] with SerializationSchema[CovidEvent] {

  private val mapper = new CsvMapper
  mapper.registerModule(new JodaModule)

  val csvSchema = mapper
    .schemaFor(classOf[CovidEvent])
    .withLineSeparator(",")
    .withoutHeader()
  val  reader = mapper.readerWithSchemaFor(classOf[CovidEvent])

  def serialize(event: CovidEvent): Array[Byte] = mapper.writer(csvSchema).writeValueAsBytes()

  @throws[IOException]
  def deserialize(message: Array[Byte]): CovidEvent = reader.readValue[CovidEvent](message)


  def isEndOfStream(nextElement: CovidEvent) = false

  def getProducedType: TypeInformation[CovidEvent] = TypeInformation.of(classOf[CovidEvent])
}

This is my PoJo for schema:

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.joda.time.DateTime;


@Data
@NoArgsConstructor
@AllArgsConstructor
public class CovidEvent {

    private long objectId;
    private int bundeslandId;
    private String bundesland;
    private String landkreis;
    private String altersgruppe;
    private String geschlecht;
    private int anzahlFall;
    private int anzahlTodesfall;
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "UTC")
    private DateTime meldedatum;
    private int landkreisId;
    private String datenstand;
    private int neuerFall;
    private int neuerTodesfall;
    private String refDatum;
    private int neuGenesen;
    private int anzahlGenesen;
    @JsonFormat(shape = JsonFormat.Shape.NUMBER)
    private boolean istErkrankungsbeginn;
    private String altersGruppe2;

    public long getEventtime() {
        return meldedatum.getMillis();
    }

}

After some research I found out that the error is probably caused by different Jackson versions in the classpath.

I thought it would be possible to use own version of Jackson, because Flink shaded the own versions.

What am I doing wrong?

UPDATE: If i import the jackson classes from shaded flink package it is working

org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper

But so i am dependent on the flink shaded jackson version.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
4.0k views
Welcome To Ask or Share your Answers For Others

1 Answer

It would work if the classloader of Flink are used. However, the way your setup works, you are just loading your user code in the system classloader while the whole DataStream application is created. I'm not going to much into more details (unless requested in a follow-up) and go for the solution:

Your DeserializationSchema should never initialize heavy resources during creation (this happens on client or job manager side), but only in open (which happens on task manager). So please move

private val mapper = new CsvMapper
  mapper.registerModule(new JodaModule)

into open.

It only works with the bundled version because - lucky for you - ObjectMapper implements Serializable but that is rarely the case for parsers and actually completely unnecessary if the deserializer is initialized correctly.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...