I'm always getting the following error.Can somebody help me please?
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.datastax.spark.connector.japi.DStreamJavaFunctions.<init>(DStreamJavaFunctions.java:24)
at com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.javaFunctions(CassandraStreamingJavaUtil.java:55)
at SparkStream.main(SparkStream.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 20 more
When I compile the following code. I've searched the web but didn't find a solution. I've got the error when I added the saveToCassandra.
import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
/**
* Created by jonas on 10/10/16.
*/
public class SparkStream implements Serializable{
public static void main(String[] args) throws Exception{
SparkConf conf = new SparkConf(true)
.setAppName("TwitterToCassandra")
.setMaster("local[*]")
.set("spark.cassandra.connection.host", "127.0.0.1")
.set("spark.cassandra.connection.port", "9042");
;
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(5000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
Set<String> topics = Collections.singleton("Test");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(
ssc,
String.class,
String.class,
kafka.serializer.StringDecoder.class,
kafka.serializer.StringDecoder.class,
kafkaParams,
topics
);
JavaDStream<Tweet> createTweet = directKafkaStream.map(s -> createTweet(s._2));
CassandraStreamingJavaUtil.javaFunctions(createTweet)
.writerBuilder("mykeyspace", "rawtweet", mapToRow(Tweet.class))
.saveToCassandra();
ssc.start();
ssc.awaitTermination();
}
public static Tweet createTweet(String rawKafka){
String[] splitted = rawKafka.split("\|");
Tweet t = new Tweet(splitted[0], splitted[1], splitted[2], splitted[3]);
return t;
}
}
My pom is the following.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.company</groupId>
<artifactId>Sentiment</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>twitter4j.org</id>
<name>twitter4j.org Repository</name>
<url>http://twitter4j.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10 -->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>[4.0,)</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.4</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-async</artifactId>
<version>4.0.4</version>
</dependency>
</dependencies>
</project>
Question&Answers:os