Operations on Multiple Columns at Once

Applying the same transformation function on multiple columns at once in PySpark. Or applying different aggregation functions for different columns at once.

Applying Same Function to Multiple Columns

Example: To lower-case column names:

df= df.select(*[f.lower(f.col(c)).alias(c) for c in df.columns])
# or
lowcase_cols= [item.lower() for item in df.columns]
df= df.toDF(*lowcase_cols)

To lower-case rows values, use the f.lower() function from PySpark. You can also use .select statement with mapping as shown in the StackOverflow answer below.

Resources: lower values in each row, but not column namesarrow-up-right performing operations on multiple columns in a Pyspark datafarme - Mediumarrow-up-right

Change Type of Multiple Columns at Once

# see how many columns we have of each type in the df
from collections import Counter
Counter([col[1] for col in df.dtypes]) 

# convert all decimal type to doubletype
decimal_cols= [str(col[0]) for col in df.dtypes if col[1].startswith("decimal")]
# df.dtypes returns a tuple: (colname, type)

for c in decimal_cols:
  df= df.withColumn(c, f.col(c).cast("double"))
	
# make sure it worked
print(Counter([col[1] for col in df.dtypes]))

source: change types of multiple columns - Stackoverflowarrow-up-right

Aggregating Different Columns w.r.t. Different Functions

First, create a list of the aggregation procedures you want for each group of columns

Notes

  • The select statement is not necessary, but it is good pracrice to optimize your query. That is, you will be passing smaller dataframe and making operations faster. In fact, if your dataframe is too big, whether in row count or column count, the operation will fail altogether if a select clause isn't used.

  • the asterisk you see whenever we pass a list to a function, like select or agg, is needed such that Spark understands to operate on element by element from the list. It won't take the list as an object and understands to read the elements on its own. Similar to :_* in Scala.

Sources and other ways to do it blogspot - aggregate multiple columns with multiple functions pysparkarrow-up-right

Another Way, the quick and dirty

The following syntax make it easy to aggregate one column with different aggregation functions, but it doesn't allow to alias, which might create problems later on if you want to do further analysis on it or referencing it; reason being the name of outcome column would be of the format function(columnname) e.g. count(col1)

where cols is a list of columns you want to groupBy.

CountDistinct example

PySpark syntax

Spark Scala syntax

Note:

  • adding a select clause before grouping or aggregation is always a good idea when you have a big dataframe, whether it's big in row count or column count. Lowering the size of the data to process will speed up your calculations. StackOverflow postarrow-up-right

Counting distinct values for multiple columns, and getting those distinct values all at once.

Summing horizontally, i.e. summing row-wise

That is, summing one row across multiple columns. e.g. counting sales or medial diagnosis for one row in the dataframe.

The core of it:

The algorithm,

  1. binarize columns. that is, if null make zero, if there's a value other than null, make it a 1

  2. combine the 1/0 values from all binarized columns into one ArrayType column you create

  3. sum up the elements of that ArrayType column, thus finding count of existance of a value in each field. You can also skip the binarizing step of course if you want to just sum up the actual values in each column.

NOTES

  • cond1 is a SQL expression. the formula inside it is explained in Scala section of this book.

  • I add backslash to the end of PySpark commands so I can continue command on the next line. To break it up and make it easier to read.

  • the asterisk * tells the Spark function to operate element-wise on the list fed to it. Similar to :_* in Scala

  • you can choose to put elements in a list first, with f.collect_list(f.col("colname")) then do f.size() on the column you created, but in this case, null values, and spaces insead of nulls will give you a headache as they would count. You can try circumventing that by replacing nulls with a distinct value before you create the array column, then use f.array_remove() on the newly created array column from collect_list or collect_set (distinct values) then use f.size. Experiment with that, I haven't successfully done it this way. Search online for "removing null values from an array column Spark". Someone suggested df.withColumn('vec_comb_clean', f.expr("FILTER(vec_comb_clean), x-> x IS NOT NULL")) though that didn't work as intended in my case for some reason; but it's more efficient if it did. Try it and verify on your own.

Categorical Variables Exploration

A very inefficient and cumbersome way to get categories of each column per class of label column, plus the top 10 categories, and the percentage of nulls of each categorical column w.r.t. each label

since this function performs heavy process, I'll do it outside the function below, and feed its result to the next function. count0, count1= get_counts_per_label(colname=colname, df=df, labelcol= "label_column")

Next, create the dictionary of columns and their most frequent levels, accounting for both target labels individually.

Save the dictionary

To open it later

To save it as a Pandas df and then CSV,

Using the dictionary to have a "rare" bucket of each categorical column

Saving the dataframe:

NOTES: I used f.row_number instead of f.rank to get the top n categories. Here's the difference between row number, rank and dense rank difference between row number, rank, and dense rankarrow-up-right stackoverflow suggestiongarrow-up-right

Crude version: Fully describing categorical variables

This version does Not take in account counts of nulls w.r.t different classes of the label variable (target variable), nor their frequency of each level within the categorical variable. Get number of levels in each variable, percentages of nulls in them, first 10 values of the distinct levels.

Multi Condition Column

Suppose we have a condition on when a bill occur, and when we pay it out accoring to this schedule: first of the month, through 4th, paid on 9th 5th through 8th, paid on 10th ... 25th through end of month, paid on 1st of next month

Extracting the month of a date column

I"m goning to extract also day and year, thinking of edge cases like bill arriving on last day of the year, having to be paid next month of the next year.

Creating the condition in SQL expression

The structure:

That's how we do it in SQL. Now you apply it with f.expr() in a .withColumn() statement.

Here's how we apply the example I have above, Note that BETWEEN is inclusive of both endpoints.

I skipped a couple of intervals, but you get the idea. The last one: anything after the 25th paid on the first of the next month is covered in the ELSE clause.

Finally, we apply it using f.expr within a .withColumn statement df2= df1.withColumn("day_of_pay", f.expr(cond))

Fixing the issues

First, we need a condition to move us to the next month. You can add another condition that for the year if month is 12 and day_of_bill is 31

Condition for the year end

We end up with months that are 13, need to fix that

Last updated