I have thousands of small files in HDFS. Need to process a slightly smaller subset of files (which is again in thousands), fileList contains list of filepaths which need to be processed.
// fileList == list of filepaths in HDFS
var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD
for (i <- 0 to fileList.size() - 1) {
val filePath = fileStatus.get(i)
val fileRDD = sparkContext.textFile(filePath)
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line))
masterRDD = masterRDD.union(sampleRDD)
}
masterRDD.first()
//Once out of loop, performing any action results in stackoverflow error due to long lineage of RDD
Exception in thread "main" java.lang.StackOverflowError
at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
=====================================================================
=====================================================================
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
See Question&Answers more detail:os