Collection of Notes. Catch-It-All Page
A collection of notes. Even more commands, procedures, links and resources
Filling Nulls in a Subset of Columns
df.na.fill("value", subset=["col1","col2",...]).dropDuplicates()
Also, you can fill one column with one value, using a dictionary, and .fillna
method: df.fillna({'colname', value})
Converting a Column to a Python List
Generally, it's not a good idea to convert a Spark column to a list or to Pandas or Numpy objects, because then you're forcing the entire column to be flattened and fit in the memory of the driver node; which defeats the purpose of Spark and its distributed computing system, and if column is too big and you don't have enough memory, the job will take a long time, and/or fails. Maybe consider solving your problem with a User Defined Function instead. See UDFs page of this book.
Still, sometimes we need a list to filter a column on it, for example. There are several ways to convert a column to a list, some are more efficient than others, Converting a PySpark DataFrame Column to a Python List - MungingData and Convert spark DataFrame column to python list - Stack Overflow show code and benchmarking details for each one. It shows that df.select('colname').toPandas()['colname']
is the fastest way. And list comprehension [row['colname'] for row in df.select("colname").collect()]
is second least efficient way. Choose your favorite. The link also gives good tips, dos and don'ts of converting to a list.
Meandering: How to parallelize .collect
in PySpark
.collect
in PySparkIn short, use .glom()
before .collect()
like so, If x is some simple list or another object, and rdd1= sc.parallelize(x)
then doing: rdd2= rdd1.glom().collect()
returns an RDD created by coalescing all elements within each partition into a list. Thus returning a list of lists, number of inner lists equals number of partitions. In other words, .glom()
gathers the elements in each partition.
Sources: How to Parallelize and Distribute Collection in PySpark | by Nutan | Medium and 3 Methods for Parallelization in Spark | by Ben Weber | Towards Data Science
Some Useful Functions
collect_list
orcollect_set
need a Window or.groupBy().agg
they collect all rows values in a list (with duplicates) or a set (without duplicates), over the given partition/aggregation, for the wanted column.greatest
andleast
which are similar tomax
andmin
respectively, but skip nulls.Read the full list in https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions or by searching for "spark official docs latest" --> API Docs --> Python --> API Reference --> Functions
While
f.array(f.col("col1"), f.col("col2"))
creates an ArrayType column of the two given, to handle ArrayType column that already exist, here's a list of functions to deal with ArrayType columns, see Spark SQL Array Functions Complete List — SparkByExamples
String Variables Editing in PySpark
Combining two string columns into one with Concatenation
The classical example is "first_name" field with "last_name" field. Using f.concat()
or f.concat_ws
from Spark. The latter meaning concatenating with separator, that is to add a separator between the two combined fields.
Substring to take a part of the string values
Use f.substring(colname, pos, len)
where, pos is starting position for the cut wanted len is how many characters you want to scoop up. Example, df.withColumn("location_code", f.substring(f.col("item_key"), 5, 3))
this will take 3 characters from the 5th one. There are also RegEx functions if you're handy with that. regexp_extract
and regexp_replace
in addition to a dataframe function df.colRegex
look that up in the docs, I haven't personally used it.
Example that combine both concepts Concat and Substring
Dealing With Date Columns in PySpark
There are great resources out there for dealing with dates in Spark, I list one below. SparkByExamples - date functions and PySpark Date Functions – SQL & Hadoop
Subtracting timestamp columns
The ones you need to subtract, after lagging them for example, convert to LongType before you subtract, or inline. Otherwise you'd get the result difference as IntervalType, which is a dictionary-like type of each time segment of the timestamp, and the difference value. Below is a full worked code template for taking the difference between two successive rows of time, w.r.t. to one ID value.
Recall that f.lead()
puts the next row as the current value.
Join on multiple columns - PySpark
What does inside are df1.idcol == df2.idcol
don't forget to drop one of the equal columns, particularly if they have the same name: df1.join(df1,...).drop(df2.idcol, df2.idcol2,...)
If it didn't take the join format dfname.colname like that, then use f.col(df1name.colname) == f.col(df2name.colname)
Union Multiple DataFrames
Important If you want to union multiple dataframes as you read them, just read them as a list in the spark.read.parquet()
command, and it will join them at once, which is far more efficient, since joins are expensive, and it even makes the dataframes ready and faster to operate on for subsequent actions.
Resource for this function, https://walkenho.github.io/merging-multiple-dataframes-in-pyspark/
Then to read it, you can do read_brunch(files_lst= files_rx[:3]).printSchema()
NOTE: if you have the same parquets, split over different parts, e.g. updates or new customers added etc. Don't use the union above because, like joins, it's expensive; instead, read them all at once in a list inside spark.read.parquet() statement, it will union them all automatically.
Columns and data types of a Spark dataframe, as a dataframe
Get row values to an ArrayType column
This function can also be used inside the aggregation in a groupBy().agg()
statement.
There's also f.collect_set()
to capture only unique values.
Converting a dataframe from Python to, and from Scala
You can pass variables (or data frames) from Scala to Python, and vice versa, using %python
or %scala
in the cell in DataBricks notebook. By registering the data frame you want as a SQL table.
apache spark - Zeppelin: Scala Dataframe to python - Stack Overflow In Scala cell in the DataBricks notebook, register the data frame as a temporary view df, with this:
Now read it in the Python cell:
Or if you that was from Python to say SQL, then you can read the temp view from a SQL cell. In a Zippelin notebook, you can use the magic command; either single or double % depending on the notebook provider.
The answer link mentioned mentioned above https://stackoverflow.com/a/35720597/11381214 has other options to do it as well.
Below didn't work, passing a df from Python to Scala
Passing a df from Scala to Python
Changing it to a SQL dataframe, allows you to plot it in PySpark/Python, if it's not natively PySpark/Python DF --> see https://stackoverflow.com/questions/45884823/pass-variables-from-scala-to-python-in-databricks.
Optimizing Spark - Run a lot of jobs by optimizing shuffle size
Either one of those suffices when skipping jobs. Both do the same thing. Of course you can choose number of paritions, usually you want it to be a multiple of number of cores you have. Each node has one or more cores.DataBricks AdaptiveQuery optimizes most things according to optimization course instructor from them.
DataBricks specific commands to configure shuffle size
Importing your own library to run on Spark cluster Zippilen notebook
In DataBricks, you can copy the absolute notebook path to run it elsewhere. Thus, you can create a notebook having all the functions you use frequently, and run it in another notebook to import all objects in it. You run a DataBricks notebook with the command: %run "notebook path in DataBricks workspace"
nothing else can be in the cell, not even a comment.
Last updated