DataFrame Manipulations

Joining, concatenate, put them in a list, dropping columns, etc.

Joining/Merging Two Dataframes, Based on a Common Column

suppose you have df1 and df2 and you want to join them on a common column, then you'd use a very similar logic to SQL,

var mergedDF = df1.join(df2, $"col1" === $"col2", "left")

☞ NOTE: If you have the same column name in both dataframes, you want to rename it in one of the dataframes, otherwise you'll end up with two columns of the same name in the merged table, and it will be confused, such that if you intend to drop one of them, both will be dropped. Rename it with .withColumnRenamed("current_col_name", "new_col_name")

"left" refers to prioritizing df1 the left dataframe, thus extra values in col1 from df1 will be filled with null in col2 from df2 .

You can chain it with other commands, and use equalTo to combine instead; this is the example given in Spark docs,

//for two dataframes `people` and `department`
people.filter(people.col("age").gt(30))
  .join(department, people.col("deptId").equalTo(department.col("id")))
  .groupBy(department.col("name"), people.col("gender"))
  .agg(avg(people.col("salary")), max(people.col("age")));

☞ Reminder, access the wanted package in Spark docs by typing the full import path. e.g. .join is an operation on a dataset or a dataframe; and so type: org.apache.spark.sql.Dataset in the search bar to see all possible methods on Dataset; and other useful information.

Combine/Concat/Union/Add/Append Two Dataframes

Here, we're not combing the dataframes vertically like above, but horizontally. That is, paste a dataframe at the end of the other. Usually we do that for dataframes of the same schema, like when we have new data coming in, or new devices/customers added etc.

Done simply with var bigDF = df1.union(df2)

☞ NOTE: Must have the same name, order, and type of columns between the two dataframes.

Creating a List of Dataframes

In order to achieve that, we need to know how to create an empty list of the correct type, since Spark Scala is particular about types, then we need to learn how to append to it, then finally, how to combine all the list of dataframes into one.

Creating an Empty List to Gather Dataframes In

import org.apache.spark.sql.DataFrame
var dfsLst : Seq[DataFrame] = Seq()

Appending to an Empty List

You can use a for loop for that purpose. Don't forget that you need to reassign the changes, otherwise it won't persist. I explain here with a simple repeated example,

dfsLst = dfsLst :+ df1
dfsLst = dfsLst :+ df2
dfsLst = dfsLst :+ df3
//now `dfsLst` has all three dataframes

Combine All Dataframes in One

All dataframes must be of the same column names, order, and types. Then use union with reduce like so,

val newDF = dfsLst.reduce(_ union _)

Case Study

Suppose you have a list of strings (say that is your partition key, for example, dates, customer id, etc). And you can have a dataframe with the same schema for each element in the list. You want to combine several dataframes for multiple Ids, how do you do it?

In my case, I needed to hit different parquet files for each day; so my list of partition keys are dates, then I can create the list of dataframes, based on that list of string simultaneously,

val dfs_lst: Seq[DataFrame] = (dates).map { item => 
  spark.read.parquet(lake_path + s"/data/PartitionDateKey=$item")
}

Then make them into one big dataframe

val newDF = dfs_lst.reduce(_ union _)

☞ NOTE: the s" " works similar to f" " or " ".format() in Python. That is, you can add a variable to the text. Here, our variable is item . This is neat because map serves here as a for-loop but far more efficient. map applies a function or a transformation to each element of an RDD, Dataset, DataFrame, Row, List, Sequence,... and return the same type.

Read more on text formatting in "Spark Scala Fundamentals" page in this book.

A few use cases about map are mentioned in the Docs page of Spark Dataset https://spark.apache.org/docs/2.4.3/api/scala/index.html#org.apache.spark.sql.Dataset

Drop Duplicate Rows in the DataFrame

df.dropDuplicates

Functions on Data Frames

You can use functions to alter data frames too. Which is the preferred way to do the prep and cleanup for your data.

Here's the generic syntax,

  • Input arguments should contain the data you want to change, as a DataFrame type. You can have also a list of the column names you want to operate on. And even a Window Specification if you want to make the function even more generic and reusable. Remember that Windows allows us to do many operations on the column.

  • Output type is a DataFrame

  • I like create a new variable for the data frame I want to alter inside the function, to avoid errors in case the original data frame was saved in the session as val so immutable, or to avoid overwriting it if it was assigned as var .

  • Have a for-loop if you want to repeat some same process to several columns, or all of them in the input list (if you have that)

  • Have a bunch of var type values if you need to do more complex things on your columns. There's an example about that in "User Defined Functions - UDF" page of this book.

  • Return the result data frame

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions._ 

def prepCleanDF(input_arguments_variables_and_data):DataFrame={
    var intermediateDF = df
    
    //do stuff 
    
    return intermediateDF
} 

Here's an example of a function I used to repeat the same process over a list of columns in the original df,

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions._ 

def valDiff(df:DataFrame, colsList:List[String], thewindow:WindowSpec):DataFrame={
    var intermediateDF = df
    for (item <- colsList) {
        intermediateDF = intermediateDF.withColumn(s"lead_$item", lead(col(item), offset=1).over(thewindow))
        intermediateDF = intermediateDF.withColumn(s"$item _diff", bround(col(item) -  col(s"lead_$item"),1) )
        intermediateDF = intermediateDF.drop(s"lead_$item")
    }
    return intermediateDF
}

See here a function to manipulate data frames with RegEx .

☞ For error handling and exceptions, see "Spark Scala Fundamentals" page of this book.

Last updated