Operations on One Column

Transforming a data frame. Transforming a column in a data frame. Filling Nulls. Handling date time columns. etc.

Basic

  • Select a subset of columns to show, use select("col1","col2"...)

  • Drop a subset of columns drop("col1","col2"...)

  • Add a new column dervied from another column after some transformation df.withColumn("newcol", some_function($"oldcol")) where $ is a substitute for the col function, which indicates that the text refers to a column name. It's needed whenever we need to operate on the column with a function. If you use the same old column name, you replace it with the new transformation.

  • Rename a column df.withColumnRenamed("oldcolName","newcolName")

  • Select columns using SQL expressions with selectExpr . This is a versatile function and very useful, specially when you want to aggregate a column or more, and still select others as they are. There will be an example of how I used it in the appropriate section of this book. The official Spark Scala Docs give the following example of usage;

  • // The following are equivalent:
    ds.selectExpr("colA", "colB as newName", "abs(colC)")
    ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))

☞ Read more on all available functions on Datasets (applied to DataFrames too) at Spark's official Doc for Scala API https://spark.apache.org/docs/2.4.3/api/scala/index.html#org.apache.spark.package then type "org.apache.spark.sql.Dataset" in the search bar in the top left corner.

IMPORTANT: None of these changes will persist, you'll have to assign it back to the dataframe, if it was declared as var or to a new dataframe. i.e.

//if it's defind as "val" type
val df = spark.read.parquet("path_to_parquet")
val newdf = df.withColumn(...)

//or
var df = spark.read.parquet("path_to_parquet")
df = df.withColumn(...)

e.g.

import org.apache.spark.sql.functions._

var df = spark.read.parquet("some_file")
df = df.withColumn("newcol", bround($"oldcol",2))

In this example, we rounded a Float column to 2 decimal places in a new column. bround is the name of the function, imported from functions and $ is used to tell Spark the text indicated a column, you can also use col() function instead of the dollar sign, bround(col("oldcol"),2)

☞ NOTE: You can chain these changes, i.e. add another .withColumn() statement after the first one, to transform another column at the same time.

Substring

substring can take a StringType or TimestampType columns. Syntax substring(colname, starting_pos, wanted_length)

Example: df.withColumn("newCol", substring($"oldCol", 0, 3)) returns the first 3 characters of the column oldCol.

Don't forget to import substring from the functions package first.

Adding an Index Column

WARNING: Spark doesn't use index column because its dataframes are distributed by nature. i.e. the dataframe will be broken up into partitions and set to different nodes in the cluster to do the wanted operations on. However, sometimes, us coming from Python, really need an index column for whatever reason. To avoid potential problems that might arise, I recommend to not add an index column to a dataframe that isn't small enough to be one partition i.e. fits on one node's memory.

//Example
df.
sort($"time".asc).
coalesce(1).
withColumn("Number", monotonicallyIncreasingId)

Details:

If you didn't make it into one partition using coalesce(1) then monotonicallyIncreasing will create long integers that are monotonically increasing alright, but not successive, as I explain here . Reason is, the created long integers will depend on the number of partitions. Read more in Spark Scala official docs, searching for "org.apache.spark.sql.functions"

If you have an RDD instead of a data frame, then you can also use ZipWithIndex or ZipWithUniqueId. Read more on it in the full post of the last link. However, when I tried it by converting the df to rdd first, Spark didn't accept converting the RDD with the ZipWith back into a dataframe using toDF function. For full details about ZipWith functions, see Scala API in official Spark docs, type "org.apache.spark.rdd" in the search box.

Get Distinct Elements of a Column

df.select("colName").distinct.show .show on the end, prints out the results. You can also find how many unique values in a column by adding .count to the end of this command.

Create a List from a Column

There are a few ways to do it, according to this StackOverflow answer, most efficient and elegant way is df.select("colName").map(r => r.getString(0)).collect.toList

Other ways, df.select("id").collect().map(_(0)).toList OR df.select("id").rdd.map(r => r(0)).collect.toList OR df.select("id").map(r => r.getString(0)).collect.toList Source

☞ Some explanation,

.map(item => "things_done_to_item") applies item-wise changes to items in Row, Array etc. Therefore, if I wanted elements as Strings from an object to a list, I use the .getString() method on each element from that object colName.map(r=>r.getString(0)).collect.toList

Why it takes a long time to convert a column from a data frame to a list,

Parquet files are columnar objects by design, which has advantages to saving them, and read their schema readily. Most of the time you will be dealing with parquet files. In a practical definition, columnar means you can operate on columns only and directly with Spark native functions or a UDF, without changing the dataframe back to an RDD. So what happens when we call .rdd.map() on a data frame to convert it to a list? In this case, we are breaking the column back into an array of Row type objects. Each Row is like a dictionary, with the keys being column names, and values being that row's value for that column. That Row appears to us like a tuple, thus to extract wanted element and transform it into a list, we need some sort of a for-loop to extract each element on it own; that's what .map do; at least for our practical purposes. Finally, .collect returns a list that contains all of the elements in this RDD. .collect is an RDD method, i.e. we can apply it only on RDDs. Check out Spark Scala API official docs on "org.apache.spark.rdd" to see all available methods on rdds.

Filter a Column on Elements From a List, Using .isin()

Suppose we have a list of strings, defined as follows, val depts:List[String]= List("Finance","IT","Accounting")

More on lists in a dedicated section later.

And we want to filter all rows in a data frame called "inventory", where the column "dept_nm" have values from this list. Then, we can do so with isin and the list convention :_* that to my understanding, allows you to acces the list elements. inventory.filter('dept_nm.isin(depts:_*)).show

Read more about what :_* really does in a comment to this post, by the same author.

Add a New Conditional Column in a One-Liner

Create a new column, having 1 or TRUE if condition is met, and 0 or FALSE if else. Applied row by row.

Examples,

//if two columns are equal
df.
withColumn("newCol", when($"col1"===$"col2" ,1).otherwise(0))

//if col1 contains a_word, and col2 has 5 characters
df.
withColumn("newCol", when(($"col1".contains("a_word")) || (length($"col2")==5), 0).otherwise(1))

//with Boolean
df.withColumn("new_col", when($"pid" === $"id", false).otherwise(true)))

//with Nulls and empty
df.
withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1))

Change Type of any Column

  1. Using the .to functions available in the functions package. Like so, df.withColumn( "time", to_timestamp($"DateAndTime") ) There are only 5 available options in the .to functions, to_csv, to_date, to_json, to_timestamp, to_utc_timestamp

  2. Using .cast() like so, df.withColumn( "newCol", $"oldCol".cast("long") ) to change a column type from anything numeric to LongType credit answer

You can enter the data type in .cast() in two ways, either the full name e.g. IntergerType, or its canonical string representation e.g. "int"

Available list of data types, in their string representations, string, boolean, byte, short, int, long, float, double, decimal, date, timestamp

☞ Read details related to .cast function in the official doc about Column. (Find it yourself: official Docs --> Scala API --> type "org.apache.spark.sql.Column" --> find "cast" in page).

Remember that the result dataframes with any modification is not saved until you update it, or save it to a new variable.

Dealing with Date and Time Columns

  1. Change Type If the date and time column in your dataframe saved as string, you will need to change it to TimestampType, you can do so in two ways as we saw above.

  2. Truncate If the date has day, month, year, hour, minute, second, etc. you can truncate it with date_trunc which takes a TimestampType column, and returns a TimeStampType column. Read more details from the official Spark Scala API, in "org.apache.spark.sql.functions" package, under "Date time functions" section.

  3. Truncate Neatly The date_trunc function turns unwanted segments of the datatime column to zeros, and keeps them. e.g. "2021-02-01 12:34:13" truncated to day only, becomes "2021-02-01 00:00:00" If you want just "2021-02-01" you need to use the substring function, and the result column would be of StringType. df.withColumn("day", substring($"time", 0, 10)) returns the first 10 characters of the column time which are "yyyy-mm-dd" without hours, mintues, seconds, etc. The longest timestamp format I came across is yyyy-MM-dd HH:mm:ss.SSSS

  4. Convert to Unix Timestamp Using unix_timestamp and the result column would be of LongType integer.

☞ Discover all other available functions to deal with date in the offical Spark Scala API docs, in the "org.apache.spark.sql.functions" page, Under "Date time functions" section.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

//1- change to TimestampType
df.select( to_timestamp('dateTimeColName) )
//or
df.select( 'dateTimeColName.cast("timestamp") )

//2- truncate yyyy-mm-dd
df.select( date_trunc("day", 'dateTimeColName) )

//3- truncate neatly
df.select( substring('dateTimeColName, 0, 10) )

//4- convert to Unix Timestamp
df.select( unix_timestamp('dateTimeColName) )

Filling Nulls

The methods included in the Na functions. There are three options, fill, replace, drop. We will talk about fill here.

There are several ways to use the df.na.fill() method,

  • Fill every null in every column with the same value, using df.na.fill(0) to fill every numeric column with zero. The other types will remain unchanged.

  • Select specific values for specific columns. There are two ways to do that, one of them is df.na.fill(0,Seq("col1")) where "col1" is a numeric column. And similarly, you can do df.na.fill("unknown", Seq("col2")) where "col2" is a string column. This syntax also means you can extend the Sequence to have more columns of the same type df.na.fill("unknown", Seq("col1", "col2",...)) . That is, you can impute many columns with the same value, if the columns were of the same type. From this post. You can also use List or Array instead of Seq.

You can fill many columns with different values in the same command. Read more about that in "Operations on Multiple Columns at Once" page in this book.

☞ Read about Na functions for full syntax and details in the official Spark Scala API docs, searching for "org.apache.spark.sql.DataFrameNaFunctions" package.

Creating a Column of Nulls

Sometimes you might need to create an empty column. Perhaps to fill it later with a conditional value, or for whatever other reason.

You will need to use lit function from the functions package. df.withColumn("empty", lit(null)) or df.withColumn("empty", lit(null: String)) both of which will create a column of NullType; Shown in schema as "empty: null (nullable = true)".

Creating a Column with Constant Value

Use lit function. Example, df.withColumn("Country", lit("USA"))

☞ Even more details on withColumn clause, with a bonus glimpse into building a schema to make a data frame.

Concatentate Values in Different Columns

Suppose you want to merge a string value in col1, col2, col3. Then you can do that in two ways accoring to this post, either using concat or directly adding them.

Advanced - Using Window Expression

We will need Window expressions to do the following tasks. That is, a specific grouping to our dataframe, called partition, and in a desired order. See "Window" page in this book for more details.

Aggregating a Column

This will return a new column, having the wanted aggregation, over the chosen window specifications.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window1 = Window.partitionBy("Id", "day").orderBy($"time".asc)


val df2 = df1.
withColumn("maxCol1", max($"col1").over(window1) ).
withColumn("sumCol1", sum('col1).over(window1) )

From this answer.

Notice that this is different than just finding the, say, maximum value of a column, which is just one number. You can get that with a simple select statement like so, df.select("max(colname)").show or df.select(max('colname)).show

☞ Find all aggregate functions and window functions in the offical Spark Scala API docs for functions package, under sections "aggregate functions", and "window functions" respectively.

☞ For other ways to aggregate and more details, see details of that in page "Aggregations" page of this book.

Shifting a Column

That is, create a new column from the old one, where values are shifted down or up, by a certain number of rows, called offset value.

We need to shift columns up or down when we need to, say, difference the value of a row from the row before, based on some partition (grouping) of data, and a certain order within the partition (group).

We shift a column down with lag and shift up with lead

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window1 = Window.partitionBy("Id", "day").orderBy($"time".asc)

//lag -shift down- one row
val df2 = df1.
withColumn("cLag", lag($"col1", offset=1).over(window1))

//lead -shift up- one row
val df2 = df1.
withColumn("cLead", lead('col1, offset=1).over(window1))

I didn't specify any value for the parameter defaultValue, so I will have null in the offset rows in the result shifted column. You can define this value to be whatever you want.

Example,

Suppose we have the following dataframe

Lines to create this sample data frame given here

And we want to create a lagged column with offset = 1

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

//create the dataframe
val DF = Seq(
  ("A", 1, "2017-12-23 09:00"),
  ("A-1", 1, "2017-12-23 11:00"),
  ("A-1", 1, "2017-12-23 10:00"),
  ("A", 2, "2017-12-23 08:00"),
  ("A", 3, "2017-12-23 07:00"),
  ("A-2", 4, "2017-12-23 12:00"),
  ("A-1", 4, "2017-12-23 12:00"),
  ("B", 1, "2017-12-23 09:00"),
  ("B-1", 1, "2017-12-23 11:00"),
  ("B-1", 1, "2017-12-23 10:00"),
  ("B", 2, "2017-12-23 08:00"),
  ("B", 3, "2017-12-23 07:00"),
  ("B-2", 4, "2017-12-23 12:00"),
  ("B-1", 4, "2017-12-23 12:00")
).toDF("id", "col1", "time")

//create the Window
val window2 = Window.partitionBy("id").orderBy('time.asc)

//show the Lag column, without a defaultValue
DF.withColumn("col1Lag", lag('col1,offset=1).over(window2)).show

//show the Lag column, with a defaultValue of zero
DF.withColumn("col1Lag", lag('col1,offset=1, defaultValue=0).over(window2 )).show

//show the Lead column, without a defaultValue
DF.withColumn("col1Lag", lead('col1,offset=1).over(window2)).show

Comparing results

See how for every group of "id"s, ordered by "time" in an ascending fashion, the rows are shifted one row below (when using lag), and the extra row on top is filled with null, or the defined defaultValue.

Forward and Backward Filling of Nulls

Main idea from this answer

Explanation, To my understanding, and I could be wrong, that "coalesce" is needed here to make sure data frame isn't shuffled i.e. preserves one partition. Since a Spark data frame is a distributed one, you don't really know which rows are thrown in which partition on which node. And since we're filling forward (using last ), or backward (using first), we need control over partition since order of rows matter.

☞ Look up coalesce ,last and first in the official docs for syntax and argument options, under functions package

// Forward-fill
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window1 = Window.partitionBy("Id", "day").orderBy($"time".asc)

val df2 = df1.
withColumn("col1", coalesce($"col1", last($"col1", true).over(window1)))

Last updated