Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I got the next parquet file:

+--------------+------------+-------+
|gf_cutoff     | country_id |gf_mlt |
+--------------+------------+-------+
|2020-12-14    |DZ          |5      |
|2020-08-06    |DZ          |4      |
|2020-07-03    |DZ          |4      |
|2020-12-14    |LT          |1      |
|2020-08-06    |LT          |1      |
|2020-07-03    |LT          |1      |

As you can see is particioned by country_id and ordered by gf_cutoff DESC. What I want to do es compare gf_mlt to check if the value has changed. To do that I want to compare the most recently gf_cutoff with the second one.

A example of this case would be compare:

 2020-12-14 DZ 5
with
 2020-08-06 DZ 4

And I want to write in a new column, if the value of the most recent date is different of the second row, put in a new column, the most recent value that is 5 for DZ and put in another column True if the value has changed or false if has not changed. Afther did this comparation, delete the rows with the older rows.

For DZ has changed and for LT hasn't changed because is all time 1.

So the output would be like this:

+--------------+------------+-------+------------+-----------+
|gf_cutoff     | country_id |gf_mlt | Has_change | old_value |
+--------------+------------+-------+------------+-----------+
|2020-12-14    |DZ          |5      |    True    |     4     |
|2020-12-14    |LT          |1      |    False   |     1     |

If you need more explanation, just tell me it.

question from:https://stackoverflow.com/questions/65842685/compare-two-values-with-scala-spark

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
1.4k views
Welcome To Ask or Share your Answers For Others

1 Answer

You can use lag over an appropriate window to get the most recent value, and then filter the most recent rows using a row_number over another appropriate window:

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn(
    "last_value",
    lag("gf_mlt", 1).over(Window.partitionBy("country_id").orderBy("gf_cutoff"))
).withColumn(
    "rn", 
    row_number().over(Window.partitionBy("country_id").orderBy(desc("gf_cutoff")))
).filter("rn = 1").withColumn(
    "changed",
    $"gf_mlt" === $"last_value"
).drop("rn")

df2.show
+----------+----------+------+----------+-------+
| gf_cutoff|country_id|gf_mlt|last_value|changed|
+----------+----------+------+----------+-------+
|2020-12-14|        DZ|     5|         4|  false|
|2020-12-14|        LT|     1|         1|   true|
+----------+----------+------+----------+-------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...