Window

Aggregations on parts of the data frame w.r.t. rows, in a certain order

The following is the basic syntax I use Window in; it proved useful most of the time. I don't use the rowsBetween() but I think it's a better idea to do so,

import org.apache.spark.sql.expressions.{Window, WindowSpec}

val window1 = Window.partitionBy("DeviceId", "day").
orderBy(col("time").asc).
rowsBetween(Window.unboundedPreceding, Window.currentRow)

So here I partitioned on two columns, and ordered the partition w.r.t. TimestampType column, in ascending order. You can order in descending order by replacing asc with desc in OrderBy clause.

There are other methods on Window you can add, like rangeBetween. https://spark.apache.org/docs/2.4.3/api/scala/index.html#org.apache.spark.sql.expressions.WindowSpec the official docs offer some explanation on how to use rowsBetween with unboundedPreceding, unboundedFollowing, and currentRow. I didn't find the examples given there sufficient for some of my use cases, but I encourage you to read the docs and understand the difference between rowsBetween and rangeBetween and expreiment on your own, because your use-case might need them.

💡 A Word of Advice

Ordering matters! Pay special attention what column you're ordering with respect to; and how (ascending or descending). Always test every single thing before you run with it. Many times, commands don't behave as you expect them to. That also applies to logic while filtering or adding conditionals. Expirement and test before you trust.

Example

Windows are most useful to control the rows you aggregate over, rather than taking the whole partition columns as groupBy would. Particularly, when using .lag and .lead functions to find, say, the difference between two (or more) successive rows. You'd apply it like so,

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val window1= Window.partitionBy("customerID", "day").
orderBy(col("time").asc)

var df1= df0.
withColumn("shifted", lead("colname").over(window1)).
withColumn("difference", col("shifted")-col("colname"))

//to keep it clean, you can drop the lagged column
df1= df1.drop("shifted")

What will happen here, is that you will have a dataframe ordered by column "time", then for every new "customerID" and "day" combination, you will subtract the current row from your selected column "colname" from the next row of the same column, in the same parition (window).

lead brings the next row up to your current row, thus you're subracting current from next. While lag pushes current row down to the next one, leaving a "null" in the first row position in the "shifted" column. Search in Spark Scala 2.4.4 official docs - functions page for "lag" and "lead" for more details.

Expirement with a toy data frame to see the effects of Window with different specifications.

Last updated