Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

According the documentation is possible to tell Spark to keep track of "out of scope" checkpoints - those that are not needed anymore - and clean them from disk.

SparkSession.builder
  ...
  .config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
  .getOrCreate()

Apparently it does so but the problem, however, is that the last checkpointed rdds are never deleted.

Question

  • Is there any configuration I am missing to perform all cleanse?
  • If there isn't: Is there any way to get the name of the temporary folder created for a particular application so I can programatically delete it? I.e. Get 0c514fb8-498c-4455-b147-aff242bd7381 from SparkContext the same way you can get the applicationId
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
1.1k views
Welcome To Ask or Share your Answers For Others

1 Answer

I know its old question but recently i was exploring on checkpoint and had similar problems. Would like to share the findings.

Question :Is there any configuration I am missing to perform all cleanse?

Setting spark.cleaner.referenceTracking.cleanCheckpoints=true is working sometime but its hard to rely on it. official document says that by setting this property

clean checkpoint files if the reference is out of scope

I don't know what exactly it means because my understanding is once spark session/context is stopped it should clean it.

However, I found a answer to your below question

If there isn't: Is there any way to get the name of the temporary folder created for a particular application so I can programatically delete it? I.e. Get 0c514fb8-498c-4455-b147-aff242bd7381 from SparkContext the same way you can get the applicationId

Yes, We can get the checkpointed directory like below:

Scala :

//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")

scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3

//It gives String so we can use org.apache.hadoop.fs to delete path 

PySpark:

// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t 
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'

// notice 'u' at the start which means It returns unicode object
// Below are the steps to get hadoop file system object and delete

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...