Aggregations
Using .agg .groupBy .pivot and Window functions; and other tips
If you want to get an aggregated value over the entire dataframe, a simple .select()
or .withColumn()
statement having the function you want suffices. For example, df.withColumn("maximum", max('colname)).show
or df.select(max('colname) as "maximum").show
. More details shows below.
However, if you want to have define specific conditions for how the aggregation should work, then it's a more complicated process. You have two options to do that, either using Windows, or using groupBy. Examples for both are shown below.
☞ IMPORTANT:
One thing to keep in mind, Window
counts all, it doesn't count distinct values. In fact, you can't use countDistinct
with Windows at all, it will error out. To count distinct values, you need to use groupBy().agg()
.
To illustrate, suppose you have a data frame where you have a row for each order a customer makes; and you want to count how many distinct customers you have. Using df.withColumn("cus_count", countDistinct('customer_id).over(window1))
will return an error, regardless how you've defined your window1
beforehand. To resolve this, do df.groupBy("col1","col2").agg(countDistinct('customer_id) as "cus_count")
where "col1" and "col2" are whatever columns you want to partition by. You can have as many partition columns as you need, with a minimum of one.
Aggregations Without Window
A few things you notice here,
You can perform a computation inside the agg function
You can nest functions
You can rename the result column simultaneously
If you didn't use the
groupBy
statement, it will return the aggregation over the entire column, as a single digit. Similar to doing,DF.select(avg('col1))
for example.Use
greatest
andleast
instead ofmax
andmin
respectively, if you want to skip the nulls. Read more on that in the official documentations.
The bround
function rounds to the selected decimal point place. Official documentation says it scales on half-even round mode if the selected scale is greater than or equal to 0, or at integral part when selected scale is less than 0.
Pivoting
You can also use .pivot
on your groupBy()
and agg()
statement. How to do that is beautiflly illustrated in this great answer .
Quick Aggregations for the Whole Column(s)
DF.select(avg('col1) as "theAvg")
gives the average of the whole column, as a single number, as a dataframe of one row and one column. Like this,
💡Tip - Extracting the Value, as a Certain Type for Further Use
Say you want to be more fancy, round it to two decimal places, and extract it as a value to use in further applications like filtering, in computations, or in a function. For example, filter the dataframe to show the whole row that contain the maximum value of a certain column, rather than just that number on its own (covered in "Filtering & Nulls" pages of this book). Then, you need the bround
function to round to 2 decimal places, or else. And the first
function to grab the first row of the result (and the only row in this case), then select the first element, by indexing right into first
function. Like so, DF.select(bround(avg('col1) ,2) as "theAvg").first()(0)
But we know that Spark Scala is very particular about types; so what if we don't want the result to be AnyType? Then we need to change it to another numeric type. Here, I'll use DoubleType, since it has a decimal already. I do that by using getDouble
function on first
.
Chaining
Of course you can chain the aggregations within the same select
statement,
Now, if you need to grab the second element, the max in our case, from this one row dataframe, follow the same logic using first
like so, DF.select(bround(avg('col1) ,2) as "theAvg", max('col1) as "theMax").first()(1)
You can also convert it to whatever type you want, I'll convert to IntegerType here,
There are many other types you can convert to. If you type "get" after the dot on first
, then press the tab
button on your keyboard, in spark-shell terminal, you will see all your available options. Here they are,
get, getDate, getInt, getMap, getTimestamp, getAs, getDecimal, getJavaMap, getSeq, getValuesMap, getBoolean, getDouble, getList, getShort, getByte, getFloat, getLocalDate, getString, getClass, getInstant, getLong, getStruct
For the most part, they're self explanatory.
💡 Practical Tip - `getClass.getName`
The other one I use a lot from the above list, is getClass
because it returns the type of the object. And it has many functions within it,
The one I use frequently is getClass.getName
as it returns the type and class of object i.e. in which package it belongs to. In our example we get,
💡Tip - Getting Most Repated Values, and Count of All Distinct Values
Show most repeated value in a column, in a descending order by count
df.groupBy("colName").count.sort('count.desc).show
Show distinct values count in a column
df.select("colName").distinct.count
Show the 100 first distinct values that show up in a column
df.select("colName").distinct.show
Aggregations With Window
This will return a new column, having the wanted aggregation, over the chosen window specifications. Chain as many withColumn
clauses as you need.
From this answer.
You can use greatest
and least
instead of max
and min
to skip nulls. Read on them in the docs.
Notice that this is different than just finding the, say, maximum value of a column, which is just one number. You can get that with a simple select statement like so, df.select("max(colname)").show
or df.select(max('colname)).show
☞ Find all aggregate functions and window functions in the offical Spark Scala API docs for functions package, under sections "aggregate functions", and "window functions" respectively.
☞ There are also other options to refine your window specification, like rangeBetween
and rowsBetween
. Main difference being,
rowsBetween
is based on the position of the row within the partition; with respect to the current rowrangeBetween
is based on the actual value of the ORDER BY expression(s)
Expand their sections on Spark Scala 2.4.4 Official Docs - Window methods to get more details.
Lastly, current row is already included if you specify your window, say, (-2,2) in eiter rowsBetween
or rangeBetween
i.e. you would be taking 2 days ago, current row, and 2 days after; if you're using a date column for your partition and ordering.
When To Use Either
You will soon be faced with the question, so when do I use groupBy().agg()
and when do I use withColumn("colname", some_function.over(my_window))
.
The decision is ultimately up to you, expirement with a toy data frame and see for yourself which does what. I can only offer you my two cents from my experience so far.
I have this general rule in my head,
Most of the time groupBy().agg()
gets the job done beautifully; you end up with only the columns you grouped by, and those you aggregated, it's a clear, easy to follow, no mess no fuss outcome. Bare in mind that this process gets you the aggregations over the entire column(s) you grouped by.
Thus, if you need some control over the rows, or you need any operations between the rows of certain columns, such as subtracing or averaging two successive rows, you need to go through the meticulous process of creating your window. Refer to "Window" page of Spark Scala page group of this guidebook for even more details about windowing.
The columns in groupBy
statement are basically partitionBy
in Window.
I use Window when I need to, say, differentiate two rows in the data frame. In Pandas that would be shift and diff and I think there's rolling too in Pandas to find the rolling average over a defined number of rows.
I also use Windows when I want to keep all the other columns in the data frame, and add an aggregated column to them. In this case, you will see the newly added column having repeated values to fill up the rows of the same partition defined in the Window specification you set up.
Last updated