Collection of Notes. Catch-It-All Page

A collection of notes. Even more commands, procedures, links and resources

Filling Nulls in a Subset of Columns

df.na.fill("value", subset=["col1","col2",...]).dropDuplicates() Also, you can fill one column with one value, using a dictionary, and .fillna method: df.fillna({'colname', value})

Converting a Column to a Python List

Generally, it's not a good idea to convert a Spark column to a list or to Pandas or Numpy objects, because then you're forcing the entire column to be flattened and fit in the memory of the driver node; which defeats the purpose of Spark and its distributed computing system, and if column is too big and you don't have enough memory, the job will take a long time, and/or fails. Maybe consider solving your problem with a User Defined Function instead. See UDFs page of this book.

Still, sometimes we need a list to filter a column on it, for example. There are several ways to convert a column to a list, some are more efficient than others, Converting a PySpark DataFrame Column to a Python List - MungingDataarrow-up-right and Convert spark DataFrame column to python list - Stack Overflowarrow-up-right show code and benchmarking details for each one. It shows that df.select('colname').toPandas()['colname'] is the fastest way. And list comprehension [row['colname'] for row in df.select("colname").collect()] is second least efficient way. Choose your favorite. The link also gives good tips, dos and don'ts of converting to a list.

Meandering: How to parallelize .collect in PySpark

In short, use .glom() before .collect() like so, If x is some simple list or another object, and rdd1= sc.parallelize(x) then doing: rdd2= rdd1.glom().collect() returns an RDD created by coalescing all elements within each partition into a list. Thus returning a list of lists, number of inner lists equals number of partitions. In other words, .glom() gathers the elements in each partition.

Sources: How to Parallelize and Distribute Collection in PySpark | by Nutan | Mediumarrow-up-right and 3 Methods for Parallelization in Spark | by Ben Weber | Towards Data Sciencearrow-up-right

Some Useful Functions

String Variables Editing in PySpark

Combining two string columns into one with Concatenation

The classical example is "first_name" field with "last_name" field. Using f.concat() or f.concat_ws from Spark. The latter meaning concatenating with separator, that is to add a separator between the two combined fields.

Substring to take a part of the string values

Use f.substring(colname, pos, len) where, pos is starting position for the cut wanted len is how many characters you want to scoop up. Example, df.withColumn("location_code", f.substring(f.col("item_key"), 5, 3)) this will take 3 characters from the 5th one. There are also RegEx functions if you're handy with that. regexp_extract and regexp_replace in addition to a dataframe function df.colRegex look that up in the docs, I haven't personally used it.

Example that combine both concepts Concat and Substring

Dealing With Date Columns in PySpark

There are great resources out there for dealing with dates in Spark, I list one below. SparkByExamples - date functionsarrow-up-right and PySpark Date Functions – SQL & Hadooparrow-up-right

Subtracting timestamp columns

The ones you need to subtract, after lagging them for example, convert to LongType before you subtract, or inline. Otherwise you'd get the result difference as IntervalType, which is a dictionary-like type of each time segment of the timestamp, and the difference value. Below is a full worked code template for taking the difference between two successive rows of time, w.r.t. to one ID value.

Recall that f.lead() puts the next row as the current value.

Sourcearrow-up-right

Join on multiple columns - PySpark

apache spark sql - Pyspark: Reference is ambiguous when joining dataframes on same column - Stack Overflowarrow-up-right Syntax:

What does inside are df1.idcol == df2.idcol don't forget to drop one of the equal columns, particularly if they have the same name: df1.join(df1,...).drop(df2.idcol, df2.idcol2,...)

If it didn't take the join format dfname.colname like that, then use f.col(df1name.colname) == f.col(df2name.colname)

Union Multiple DataFrames

Important If you want to union multiple dataframes as you read them, just read them as a list in the spark.read.parquet() command, and it will join them at once, which is far more efficient, since joins are expensive, and it even makes the dataframes ready and faster to operate on for subsequent actions.

Resource for this function, https://walkenho.github.io/merging-multiple-dataframes-in-pyspark/

Then to read it, you can do read_brunch(files_lst= files_rx[:3]).printSchema()

NOTE: if you have the same parquets, split over different parts, e.g. updates or new customers added etc. Don't use the union above because, like joins, it's expensive; instead, read them all at once in a list inside spark.read.parquet() statement, it will union them all automatically.

Columns and data types of a Spark dataframe, as a dataframe

Get row values to an ArrayType column

This function can also be used inside the aggregation in a groupBy().agg() statement.

There's also f.collect_set() to capture only unique values.

Converting a dataframe from Python to, and from Scala

You can pass variables (or data frames) from Scala to Python, and vice versa, using %python or %scala in the cell in DataBricks notebook. By registering the data frame you want as a SQL table.

apache spark - Zeppelin: Scala Dataframe to python - Stack Overflowarrow-up-right In Scala cell in the DataBricks notebook, register the data frame as a temporary view df, with this:

Now read it in the Python cell:

Or if you that was from Python to say SQL, then you can read the temp view from a SQL cell. In a Zippelin notebook, you can use the magic command; either single or double % depending on the notebook provider.

The answer link mentioned mentioned above https://stackoverflow.com/a/35720597/11381214 has other options to do it as well.

Below didn't work, passing a df from Python to Scala

Passing a df from Scala to Python

Changing it to a SQL dataframe, allows you to plot it in PySpark/Python, if it's not natively PySpark/Python DF --> see https://stackoverflow.com/questions/45884823/pass-variables-from-scala-to-python-in-databricks.

Optimizing Spark - Run a lot of jobs by optimizing shuffle size

Either one of those suffices when skipping jobs. Both do the same thing. Of course you can choose number of paritions, usually you want it to be a multiple of number of cores you have. Each node has one or more cores.DataBricks AdaptiveQuery optimizes most things according to optimization course instructor from them.

DataBricks specific commands to configure shuffle size

Importing your own library to run on Spark cluster Zippilen notebook

In DataBricks, you can copy the absolute notebook path to run it elsewhere. Thus, you can create a notebook having all the functions you use frequently, and run it in another notebook to import all objects in it. You run a DataBricks notebook with the command: %run "notebook path in DataBricks workspace" nothing else can be in the cell, not even a comment.

Last updated