Aggregations

Using .agg .groupBy .pivot and Window functions; and other tips

If you want to get an aggregated value over the entire dataframe, a simple .select() or .withColumn() statement having the function you want suffices. For example, df.withColumn("maximum", max('colname)).show or df.select(max('colname) as "maximum").show . More details shows below.

However, if you want to have define specific conditions for how the aggregation should work, then it's a more complicated process. You have two options to do that, either using Windows, or using groupBy. Examples for both are shown below.

IMPORTANT: One thing to keep in mind, Window counts all, it doesn't count distinct values. In fact, you can't use countDistinct with Windows at all, it will error out. To count distinct values, you need to use groupBy().agg() . To illustrate, suppose you have a data frame where you have a row for each order a customer makes; and you want to count how many distinct customers you have. Using df.withColumn("cus_count", countDistinct('customer_id).over(window1)) will return an error, regardless how you've defined your window1 beforehand. To resolve this, do df.groupBy("col1","col2").agg(countDistinct('customer_id) as "cus_count") where "col1" and "col2" are whatever columns you want to partition by. You can have as many partition columns as you need, with a minimum of one.

Aggregations Without Window

import org.apache.spark.sql.functions._

val df2 = df1.groupBy("Id","day").agg(
    bround(avg("col1"), 2) as "roundedCol1", 
    max("col4") as "maxCol4",
    bround( max("col2") - min("col2")  ,1) as "range")

A few things you notice here,

  • You can perform a computation inside the agg function

  • You can nest functions

  • You can rename the result column simultaneously

  • If you didn't use the groupBy statement, it will return the aggregation over the entire column, as a single digit. Similar to doing, DF.select(avg('col1)) for example.

  • Use greatest and least instead of max and min respectively, if you want to skip the nulls. Read more on that in the official documentations.

The bround function rounds to the selected decimal point place. Official documentation says it scales on half-even round mode if the selected scale is greater than or equal to 0, or at integral part when selected scale is less than 0.

Pivoting

You can also use .pivot on your groupBy() and agg() statement. How to do that is beautiflly illustrated in this great answer .

Quick Aggregations for the Whole Column(s)

DF.select(avg('col1) as "theAvg") gives the average of the whole column, as a single number, as a dataframe of one row and one column. Like this,

💡Tip - Extracting the Value, as a Certain Type for Further Use

Say you want to be more fancy, round it to two decimal places, and extract it as a value to use in further applications like filtering, in computations, or in a function. For example, filter the dataframe to show the whole row that contain the maximum value of a certain column, rather than just that number on its own (covered in "Filtering & Nulls" pages of this book). Then, you need the bround function to round to 2 decimal places, or else. And the first function to grab the first row of the result (and the only row in this case), then select the first element, by indexing right into first function. Like so, DF.select(bround(avg('col1) ,2) as "theAvg").first()(0)

But we know that Spark Scala is very particular about types; so what if we don't want the result to be AnyType? Then we need to change it to another numeric type. Here, I'll use DoubleType, since it has a decimal already. I do that by using getDouble function on first.

Chaining

Of course you can chain the aggregations within the same select statement,

Now, if you need to grab the second element, the max in our case, from this one row dataframe, follow the same logic using first like so, DF.select(bround(avg('col1) ,2) as "theAvg", max('col1) as "theMax").first()(1)

You can also convert it to whatever type you want, I'll convert to IntegerType here,

There are many other types you can convert to. If you type "get" after the dot on first, then press the tab button on your keyboard, in spark-shell terminal, you will see all your available options. Here they are,

get, getDate, getInt, getMap, getTimestamp, getAs, getDecimal, getJavaMap, getSeq, getValuesMap, getBoolean, getDouble, getList, getShort, getByte, getFloat, getLocalDate, getString, getClass, getInstant, getLong, getStruct

For the most part, they're self explanatory.

💡 Practical Tip - `getClass.getName`

The other one I use a lot from the above list, is getClass because it returns the type of the object. And it has many functions within it,

The one I use frequently is getClass.getName as it returns the type and class of object i.e. in which package it belongs to. In our example we get,

💡Tip - Getting Most Repated Values, and Count of All Distinct Values

  • Show most repeated value in a column, in a descending order by count df.groupBy("colName").count.sort('count.desc).show

  • Show distinct values count in a column df.select("colName").distinct.count

  • Show the 100 first distinct values that show up in a column df.select("colName").distinct.show

Aggregations With Window

This will return a new column, having the wanted aggregation, over the chosen window specifications. Chain as many withColumn clauses as you need.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window1 = Window.partitionBy("Id", "day").orderBy($"time".asc)


val df2 = df1.
withColumn("maxCol1", max($"col1").over(window1) ).
withColumn("sumCol1", sum('col1).over(window1) )

From this answer.

You can use greatest and least instead of max and min to skip nulls. Read on them in the docs.

Notice that this is different than just finding the, say, maximum value of a column, which is just one number. You can get that with a simple select statement like so, df.select("max(colname)").show or df.select(max('colname)).show

☞ Find all aggregate functions and window functions in the offical Spark Scala API docs for functions package, under sections "aggregate functions", and "window functions" respectively.

☞ There are also other options to refine your window specification, like rangeBetween and rowsBetween . Main difference being,

  • rowsBetween is based on the position of the row within the partition; with respect to the current row

  • rangeBetween is based on the actual value of the ORDER BY expression(s)

Expand their sections on Spark Scala 2.4.4 Official Docs - Window methods to get more details.

Lastly, current row is already included if you specify your window, say, (-2,2) in eiter rowsBetween or rangeBetween i.e. you would be taking 2 days ago, current row, and 2 days after; if you're using a date column for your partition and ordering.

When To Use Either

You will soon be faced with the question, so when do I use groupBy().agg() and when do I use withColumn("colname", some_function.over(my_window)) . The decision is ultimately up to you, expirement with a toy data frame and see for yourself which does what. I can only offer you my two cents from my experience so far.

I have this general rule in my head, Most of the time groupBy().agg() gets the job done beautifully; you end up with only the columns you grouped by, and those you aggregated, it's a clear, easy to follow, no mess no fuss outcome. Bare in mind that this process gets you the aggregations over the entire column(s) you grouped by. Thus, if you need some control over the rows, or you need any operations between the rows of certain columns, such as subtracing or averaging two successive rows, you need to go through the meticulous process of creating your window. Refer to "Window" page of Spark Scala page group of this guidebook for even more details about windowing. The columns in groupBy statement are basically partitionBy in Window. I use Window when I need to, say, differentiate two rows in the data frame. In Pandas that would be shift and diff and I think there's rolling too in Pandas to find the rolling average over a defined number of rows. I also use Windows when I want to keep all the other columns in the data frame, and add an aggregated column to them. In this case, you will see the newly added column having repeated values to fill up the rows of the same partition defined in the Window specification you set up.

Last updated