Collection of Notes. Catch-It-All Page

A collection of notes. Even more commands, procedures, links and resources

Filling Nulls in a Subset of Columns

df.na.fill("value", subset=["col1","col2",...]).dropDuplicates() Also, you can fill one column with one value, using a dictionary, and .fillna method: df.fillna({'colname', value})

Converting a Column to a Python List

Generally, it's not a good idea to convert a Spark column to a list or to Pandas or Numpy objects, because then you're forcing the entire column to be flattened and fit in the memory of the driver node; which defeats the purpose of Spark and its distributed computing system, and if column is too big and you don't have enough memory, the job will take a long time, and/or fails. Maybe consider solving your problem with a User Defined Function instead. See UDFs page of this book.

Still, sometimes we need a list to filter a column on it, for example. There are several ways to convert a column to a list, some are more efficient than others, Converting a PySpark DataFrame Column to a Python List - MungingData and Convert spark DataFrame column to python list - Stack Overflow show code and benchmarking details for each one. It shows that df.select('colname').toPandas()['colname'] is the fastest way. And list comprehension [row['colname'] for row in df.select("colname").collect()] is second least efficient way. Choose your favorite. The link also gives good tips, dos and don'ts of converting to a list.

Meandering: How to parallelize .collect in PySpark

In short, use .glom() before .collect() like so, If x is some simple list or another object, and rdd1= sc.parallelize(x) then doing: rdd2= rdd1.glom().collect() returns an RDD created by coalescing all elements within each partition into a list. Thus returning a list of lists, number of inner lists equals number of partitions. In other words, .glom() gathers the elements in each partition.

Sources: How to Parallelize and Distribute Collection in PySpark | by Nutan | Medium and 3 Methods for Parallelization in Spark | by Ben Weber | Towards Data Science

Some Useful Functions

  • collect_list or collect_set need a Window or .groupBy().agg they collect all rows values in a list (with duplicates) or a set (without duplicates), over the given partition/aggregation, for the wanted column.

  • greatest and least which are similar to max and min respectively, but skip nulls.

  • Read the full list in https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions or by searching for "spark official docs latest" --> API Docs --> Python --> API Reference --> Functions

  • While f.array(f.col("col1"), f.col("col2")) creates an ArrayType column of the two given, to handle ArrayType column that already exist, here's a list of functions to deal with ArrayType columns, see Spark SQL Array Functions Complete List — SparkByExamples

String Variables Editing in PySpark

Combining two string columns into one with Concatenation

The classical example is "first_name" field with "last_name" field. Using f.concat() or f.concat_ws from Spark. The latter meaning concatenating with separator, that is to add a separator between the two combined fields.

Substring to take a part of the string values

Use f.substring(colname, pos, len) where, pos is starting position for the cut wanted len is how many characters you want to scoop up. Example, df.withColumn("location_code", f.substring(f.col("item_key"), 5, 3)) this will take 3 characters from the 5th one. There are also RegEx functions if you're handy with that. regexp_extract and regexp_replace in addition to a dataframe function df.colRegex look that up in the docs, I haven't personally used it.

Example that combine both concepts Concat and Substring

df2= df1\
.withColumn("col1", f.col("date_as_string").cast("string"))\
.withColumn("date", f.concat_ws("_", f.substring(f.col("col1"), 0, 4), f.substring(f.col("col1"), 5,2), f.substring(f.col("col1"), 7,2)).cast("date")\
.drop("col1")

Dealing With Date Columns in PySpark

There are great resources out there for dealing with dates in Spark, I list one below. SparkByExamples - date functions and PySpark Date Functions – SQL & Hadoop

Subtracting timestamp columns

The ones you need to subtract, after lagging them for example, convert to LongType before you subtract, or inline. Otherwise you'd get the result difference as IntervalType, which is a dictionary-like type of each time segment of the timestamp, and the difference value. Below is a full worked code template for taking the difference between two successive rows of time, w.r.t. to one ID value.

import pyspark.sql.functions as f
from pyspark.sql.types import * 
from pyspark.sql.window import Window

# the window is needed in order to use f.lead() function 
win0= Window.partitionBy("idcolumn").orderBy("timestampcol")

# create the next timestamp on the same row
df= df.withColumn("next_timestamp", 
	f.lead(f.col("timestampcol")).over(win0))
	
# compute the time difference	
df= df.withColumn("time_diff", 
	f.col("next_timestamp").cast("long") - f.col("timestampcol").cast("long"))

Recall that f.lead() puts the next row as the current value.

Source

Join on multiple columns - PySpark

apache spark sql - Pyspark: Reference is ambiguous when joining dataframes on same column - Stack Overflow Syntax:

df1\
.join(df2, ( == ) & ( == ) & ( == ), how='left')

What does inside are df1.idcol == df2.idcol don't forget to drop one of the equal columns, particularly if they have the same name: df1.join(df1,...).drop(df2.idcol, df2.idcol2,...)

If it didn't take the join format dfname.colname like that, then use f.col(df1name.colname) == f.col(df2name.colname)

Union Multiple DataFrames

Important If you want to union multiple dataframes as you read them, just read them as a list in the spark.read.parquet() command, and it will join them at once, which is far more efficient, since joins are expensive, and it even makes the dataframes ready and faster to operate on for subsequent actions.

# files_lst is a list of strings, each of which is a parquet full path and name

def read_bunch(files_lst):
	from functools import reduce
	from pyspark.sql import DataFrame
	
	dfs_lst= []
	for item in files_lst:
		dfs_lst.append(spark.read.parquet(item))
		local_df= reduce(DataFrame.union, dfs_lst)
		# now you can discard dfs_lst, no need for it anymore
	
	return local_df.distinct()

Resource for this function, https://walkenho.github.io/merging-multiple-dataframes-in-pyspark/

Then to read it, you can do read_brunch(files_lst= files_rx[:3]).printSchema()

NOTE: if you have the same parquets, split over different parts, e.g. updates or new customers added etc. Don't use the union above because, like joins, it's expensive; instead, read them all at once in a list inside spark.read.parquet() statement, it will union them all automatically.

Columns and data types of a Spark dataframe, as a dataframe

from pyspark.sql.types import *

dfcols_type= df.dtypes

dfcols_schema= StructType([
StructField("colname", StringType(), True), 
StructField("colType", StringType(), True)
])

cols_types_df= spark.createDataFrame(dfcols, schema= dfcols_schema)

cols_types_df.show()
# .display() instead of .show() if you're on DataBricks, then you can download cols_types_df as CSV

Get row values to an ArrayType column

aggdf.withColumn("all_drugs", f.collect_list(f.col("colname")).over(window1))

This function can also be used inside the aggregation in a groupBy().agg() statement.

There's also f.collect_set() to capture only unique values.

Converting a dataframe from Python to, and from Scala

You can pass variables (or data frames) from Scala to Python, and vice versa, using %python or %scala in the cell in DataBricks notebook. By registering the data frame you want as a SQL table.

apache spark - Zeppelin: Scala Dataframe to python - Stack Overflow In Scala cell in the DataBricks notebook, register the data frame as a temporary view df, with this:

//registerTempTable in Spark 1.x
df.createTempView("df")
# also can use
df.createOrReplaceTempView("dfview")

Now read it in the Python cell:

df= sqlContext.table("df")

Or if you that was from Python to say SQL, then you can read the temp view from a SQL cell. In a Zippelin notebook, you can use the magic command; either single or double % depending on the notebook provider.

%%sql
SELECT * FROM dfview

The answer link mentioned mentioned above https://stackoverflow.com/a/35720597/11381214 has other options to do it as well.

Below didn't work, passing a df from Python to Scala

%python
python_df.registerTempTable("temp_table")

Passing a df from Scala to Python

scalaDF.registerTempTable("some_table")
spark.table("some_table")

Changing it to a SQL dataframe, allows you to plot it in PySpark/Python, if it's not natively PySpark/Python DF --> see https://stackoverflow.com/questions/45884823/pass-variables-from-scala-to-python-in-databricks.

Optimizing Spark - Run a lot of jobs by optimizing shuffle size

spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

Either one of those suffices when skipping jobs. Both do the same thing. Of course you can choose number of paritions, usually you want it to be a multiple of number of cores you have. Each node has one or more cores.DataBricks AdaptiveQuery optimizes most things according to optimization course instructor from them.

DataBricks specific commands to configure shuffle size

#%sql
set databricks.adaptive.autoOptimizeShuffle.enabled= true;
-- set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
-- set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

Importing your own library to run on Spark cluster Zippilen notebook

In DataBricks, you can copy the absolute notebook path to run it elsewhere. Thus, you can create a notebook having all the functions you use frequently, and run it in another notebook to import all objects in it. You run a DataBricks notebook with the command: %run "notebook path in DataBricks workspace" nothing else can be in the cell, not even a comment.

Last updated