User Defined Functions - UDF
Apply your own functions to a data frame column & adding a function as User Defined Function to Spark session
We saw in Spark Scala Fundamentals page how to write a general purpose function in Spark Scala, but most of the time, we need a custom function to apply to a column in a dataframe. That is, will take in columns as input arguments, and output will populate to a column; thus operating row-wise, on different values for each input column.
To do that, we need to write the function, then register it with Spark Session before we can apply it. Here's the general structure,
The _)
at the end of step 2 tells Spark your function takes arguments; Spark will throw an error and tells you specifically to add it if left out.
Here's an example of how I used it for my use-case,
The input arguments represent three columns in the data frame I intend to use it on.
I add local variables of the type
var
to work with inside the function. Remember,var
is mutable (changeable),val
isn't. Usevar
in loops, functions, and where you want to keep transforming the same df.I had to define some value, not necessarily an initial value, but it seems like you need to declare your local variables, otherwise it will error out.
The returned value of the function is the value that is going to fill the rows of the columns you will add in the next step.
One of the input columns in the arguments is the lagged version of the other input column. That's how I do computations or comparisions between successive rows. Read more on lagging in page "Operations on One Column" of this book, under "shifting a column" section.
While registering your function with Spark, the name you'll use for your function, and the assigned value to the registration step, can be the same to avoid confusion when you apply it.
When you apply your function, you can do the the Python way. Either input arguments in order, and no need to set
=
,or by explicitly assign the arguments names. e.g. I could have done,df=df.withColumn("cycDur", duration(fcurStg= 'fcurStg, LagCurStg= 'LagCurStg, unixTime= 'unixTime) )
Here, I used the single quote to reference a column, rather than using thecol
function. You could also do$"colName"
as you saw in "Accessing Variables in Data Frames" page of this book.I also used the column names as also input arguments for the function to avoid my personal confusion.
Another Way
No need to register it, but must be defined Row-wise.
I haven't personally used this method, but it works. You can follow the answer in the link with the example dataframe it provided. The answer has some other functions and talks about Windows, which I cover in a later dedicated page of this book.
☞ Resources and Notes
Read this Source For mechanics, notes, and for defining a lambda function i.e. a one line function. See databricks for more details and nulls handling. Don’t forget you can select which rows to drop using .filter
and df.na.drop
. There’s also .fill
and .replace
on the .na
method.
Reading the two sources shows the mechanics of UDFs, they basically disregard the Spark optimizations I made, and thus filters may or may not apply, specially for nulls. Nulls must be handled in cases (if-else) or in the written function itself.
UDF for Element-Wise Operations on ArrayType Columns
You can also use a UDF to do element-wise transformations on ArrayType columns. I haven't tested that in Spark Scala myself, but only in PySpark; though I'm sure it works the same. Refer to PySpark page group, then UDF page of this notebook for details and examples.
Last updated