User Defined Functions - UDF

Apply your own functions to a data frame column & adding a function as User Defined Function to Spark session

We saw in Spark Scala Fundamentals page how to write a general purpose function in Spark Scala, but most of the time, we need a custom function to apply to a column in a dataframe. That is, will take in columns as input arguments, and output will populate to a column; thus operating row-wise, on different values for each input column.

To do that, we need to write the function, then register it with Spark Session before we can apply it. Here's the general structure,

//1. the function
def funcName(col1:Type, col2:Type,...):outputColType={
    /*
    insert your statements here.
    don't forget to use `var` for intermediate values needed.
    */
    return wantedValue
    
//2. register your function as UDF with Spark
val whatever = spark.udf.register("name_youll_use", funcName _)

//3. apply in the `withColumn` statement as you're used to
df = df.withColumn("newCol", name_youll_use(input_columns) )

The _) at the end of step 2 tells Spark your function takes arguments; Spark will throw an error and tells you specifically to add it if left out.

Here's an example of how I used it for my use-case,

  • The input arguments represent three columns in the data frame I intend to use it on.

  • I add local variables of the type var to work with inside the function. Remember, var is mutable (changeable), val isn't. Use var in loops, functions, and where you want to keep transforming the same df.

  • I had to define some value, not necessarily an initial value, but it seems like you need to declare your local variables, otherwise it will error out.

  • The returned value of the function is the value that is going to fill the rows of the columns you will add in the next step.

  • One of the input columns in the arguments is the lagged version of the other input column. That's how I do computations or comparisions between successive rows. Read more on lagging in page "Operations on One Column" of this book, under "shifting a column" section.

  • While registering your function with Spark, the name you'll use for your function, and the assigned value to the registration step, can be the same to avoid confusion when you apply it.

  • When you apply your function, you can do the the Python way. Either input arguments in order, and no need to set = ,or by explicitly assign the arguments names. e.g. I could have done, df=df.withColumn("cycDur", duration(fcurStg= 'fcurStg, LagCurStg= 'LagCurStg, unixTime= 'unixTime) ) Here, I used the single quote to reference a column, rather than using the col function. You could also do $"colName" as you saw in "Accessing Variables in Data Frames" page of this book.

  • I also used the column names as also input arguments for the function to avoid my personal confusion.

//1. the function
def cycleDuration(fcurStg:Long, LagCurStg:Long, unixTime:Long):Long={
    var end:Long = 0
    var start:Long = 0 
    if (fcurStg > LagCurStg && LagCurStg==0){
        start=unixTime} else if (fcurStg < LagCurStg && fcurStg==0){
        end=unixTime} else {
        start=0
        end=0
        }  
    var dur:Long = end - start
    return dur
}

//2. register it as udf
val duration= spark.udf.register("duration", cycleDuration _)

//3. apply function
df=df.withColumn("cycDur", duration(col("fcurStg"), col("LagCurStg"), col("unixTime") )) 

Another Way

No need to register it, but must be defined Row-wise.

//1. the function
def adjustAndZip = udf(
  (row: Seq[Long], lag: Seq[Int], isChild: Boolean) => {
    val adjustedLag = if (isChild) Seq.fill[Int](lag.length)(lag.head) else lag
    row zip adjustedLag
  })
  
//2. apply function
groupedDF.withColumn("temp",
  adjustAndZip($"row_id_list", $"col1_lag_list", $"is_child")
)

source

I haven't personally used this method, but it works. You can follow the answer in the link with the example dataframe it provided. The answer has some other functions and talks about Windows, which I cover in a later dedicated page of this book.

☞ Resources and Notes

Read this Source For mechanics, notes, and for defining a lambda function i.e. a one line function. See databricks for more details and nulls handling. Don’t forget you can select which rows to drop using .filter and df.na.drop. There’s also .fill and .replace on the .na method.

Reading the two sources shows the mechanics of UDFs, they basically disregard the Spark optimizations I made, and thus filters may or may not apply, specially for nulls. Nulls must be handled in cases (if-else) or in the written function itself.

UDF for Element-Wise Operations on ArrayType Columns

You can also use a UDF to do element-wise transformations on ArrayType columns. I haven't tested that in Spark Scala myself, but only in PySpark; though I'm sure it works the same. Refer to PySpark page group, then UDF page of this notebook for details and examples.

Last updated