Operations on Multiple Columns at Once

Applying the same transformation function on multiple columns at once in PySpark. Or applying different aggregation functions for different columns at once.

Applying Same Function to Multiple Columns

Example: To lower-case column names:

df= df.select(*[f.lower(f.col(c)).alias(c) for c in df.columns])
# or
lowcase_cols= [item.lower() for item in df.columns]
df= df.toDF(*lowcase_cols)

To lower-case rows values, use the f.lower() function from PySpark. You can also use .select statement with mapping as shown in the StackOverflow answer below.

Resources: lower values in each row, but not column names performing operations on multiple columns in a Pyspark datafarme - Medium

Change Type of Multiple Columns at Once

# see how many columns we have of each type in the df
from collections import Counter
Counter([col[1] for col in df.dtypes]) 

# convert all decimal type to doubletype
decimal_cols= [str(col[0]) for col in df.dtypes if col[1].startswith("decimal")]
# df.dtypes returns a tuple: (colname, type)

for c in decimal_cols:
  df= df.withColumn(c, f.col(c).cast("double"))
	
# make sure it worked
print(Counter([col[1] for col in df.dtypes]))

source: change types of multiple columns - Stackoverflow

Aggregating Different Columns w.r.t. Different Functions

First, create a list of the aggregation procedures you want for each group of columns

# have a list for what you need to do to each column
cols_to_sum= ["col1", "col2", "col3"]
cols_to_count= ["col4", "col5"]
cols_to_max= ["col6"]

# create a list of those aggregation procedures
aggs= [f.sum(f.col(c)).alias(c) for c in cols_to_sum] + [f.count(f.col(c)).alias(c) for c in cols_to_count] + [f.max(f.col(c)).alias(c) for c in cols_to_max]

# put the aggregations list in the statement
aggd_df= df\
.select(*(["id_column"] + cols_to_sum + cols_to_count + cols_to_max))\
.groupBy("id_column")\
.agg(*aggs)

Notes

  • The select statement is not necessary, but it is good pracrice to optimize your query. That is, you will be passing smaller dataframe and making operations faster. In fact, if your dataframe is too big, whether in row count or column count, the operation will fail altogether if a select clause isn't used.

  • the asterisk you see whenever we pass a list to a function, like select or agg, is needed such that Spark understands to operate on element by element from the list. It won't take the list as an object and understands to read the elements on its own. Similar to :_* in Scala.

Sources and other ways to do it blogspot - aggregate multiple columns with multiple functions pyspark

Another Way, the quick and dirty

The following syntax make it easy to aggregate one column with different aggregation functions, but it doesn't allow to alias, which might create problems later on if you want to do further analysis on it or referencing it; reason being the name of outcome column would be of the format function(columnname) e.g. count(col1)

aggs= [f.max, f.min, f.avg, f.stddev, f.count]
res_columns= [v("colname") for v in aggs]
df1= df\
.groupBy(*cols).agg(*res_columns)\

where cols is a list of columns you want to groupBy.

CountDistinct example

PySpark syntax

cols_to_select= ["col1","col2", "col3"]
df\
.select(*cols_to_select)\
.agg(*(f.countDistinct(f.col(item)).alias(item) for item in cols_to_select))

Spark Scala syntax

val exprs= df.columns.map((_ -> "approx_count_distinct")).toMap
df.agg(exprs).show       

Note:

  • adding a select clause before grouping or aggregation is always a good idea when you have a big dataframe, whether it's big in row count or column count. Lowering the size of the data to process will speed up your calculations. StackOverflow post

Counting distinct values for multiple columns, and getting those distinct values all at once.

df\
.select(*list_of_columns)\
.agg(*[f.collect_set(f.col(item)) for item in list_of_columns], 
*[f.countDistinct(f.col(item)) for item in list_of_columns])\
.display()

Summing horizontally, i.e. summing row-wise

That is, summing one row across multiple columns. e.g. counting sales or medial diagnosis for one row in the dataframe.

The core of it:

import pyspark.sql.functions as f
summing_cond="AGGREGATE(arraycolname, 0 (x,y) -> x+y)"
df=df.withColumn("arraycolSum", f.expr(summing_cond))

The algorithm,

  1. binarize columns. that is, if null make zero, if there's a value other than null, make it a 1

  2. combine the 1/0 values from all binarized columns into one ArrayType column you create

  3. sum up the elements of that ArrayType column, thus finding count of existance of a value in each field. You can also skip the binarizing step of course if you want to just sum up the actual values in each column.

import pyspark.sql.functions as f
from pyspark.sql.types import *

cols_to_sum_horizontally= [...]
# if null:0, else 1
for item in cols_to_sum_horizontally:
	df= df\
	.withColumn("{}_int".format(item), 
	(f.when(f.col(item).isNotNull(), 1).otherwise(0)).cast("int") )\
	.drop(item)
	
# create an array column of the 1/0 value
df= df\
.withColumn("sales_", f.array(*[k for k in df.columns if k.endswith("_int")]))

# sum the 1/0 array column, to calculate count
cond1= "AGGREGATE(sales_, 0, (x,y) -> x+y)"

df= df\
.withColumn("sales_count", f.expr(cond1))

NOTES

  • cond1 is a SQL expression. the formula inside it is explained in Scala section of this book.

  • I add backslash to the end of PySpark commands so I can continue command on the next line. To break it up and make it easier to read.

  • the asterisk * tells the Spark function to operate element-wise on the list fed to it. Similar to :_* in Scala

  • you can choose to put elements in a list first, with f.collect_list(f.col("colname")) then do f.size() on the column you created, but in this case, null values, and spaces insead of nulls will give you a headache as they would count. You can try circumventing that by replacing nulls with a distinct value before you create the array column, then use f.array_remove() on the newly created array column from collect_list or collect_set (distinct values) then use f.size. Experiment with that, I haven't successfully done it this way. Search online for "removing null values from an array column Spark". Someone suggested df.withColumn('vec_comb_clean', f.expr("FILTER(vec_comb_clean), x-> x IS NOT NULL")) though that didn't work as intended in my case for some reason; but it's more efficient if it did. Try it and verify on your own.

Categorical Variables Exploration

A very inefficient and cumbersome way to get categories of each column per class of label column, plus the top 10 categories, and the percentage of nulls of each categorical column w.r.t. each label

def get_counts_per_label(colname, df, labelcol="label_column"):
	"""used internally in the next function"""
	tup= df\
	.select(colname, labelcol)\
	.groupBy(labelcol)\
	.agg(f.count(f.col(colname)).alias('count'))\
	.collect()
	
	count0= [row['count'] for row in tup if row[labelcol]==0][0]
	count1= [row['count'] for row in tup if row[labelcol]==1][0]
	
	return count0, count1

since this function performs heavy process, I'll do it outside the function below, and feed its result to the next function. count0, count1= get_counts_per_label(colname=colname, df=df, labelcol= "label_column")

def get_top_n_per_label(colname, df, count0, count1, top=10):
	"""plus cumulative sum of the top n or else ratios, per label results returned will be in order. `top` is the number of top categories wanted. 
	returns a dictionary {colname: (cumu_sum for class0, cumu_sum for class1, top n categories)}"""
	
	from pyspark.sql.window import Window
	win1= Window.partitionBy(labelcol).orderBy(f.col('ratio').desc())
	
	temp= df\
	.select(colname, labelcol)\
	.groupBy(colname, labelcol)\
	.agg(f.count(f.col(colname)).alias("count"))\
	.orderBy(labelcol, f.col("count").desc())\
	.withColumn('ratio', 
		f.when(f.col(labelcol)==0, f.col("count")/count0)
		.when(f.col(labelcol)==1), f.col("count")/count1)
		.otherwise(None) )
		
	temp1=temp\
	.select('*', f.row_number().over(win1).alias("row_number"))\
	.drop("count")\
	.filter(f.col("row_number")<=top)\
	.withColumn("cumu_sum", f.sum(f.col("ratio")).over(win1))
	
	temp2=temp1.collect()
	
	top_n_categories= list(set([row[colname] for row in temp2]))
	
	temp3_0=[round(row['cumu_sum'], 3) for row in temp2 if (row[labelcol]==0) and (row['row_number']==top)][0]
	temp3_1=[round(row['cumu_sum'], 3) for row in temp2 if (row[labelcol]==1) and (row['row_number']==top)][0]
	
	summ_dict= {colname: (temp3_0, temp3_1, top_n_categories)}
	
	return summ_dict

Next, create the dictionary of columns and their most frequent levels, accounting for both target labels individually.

most_frequent=dict()
for item in categorical_columns:
	most_frequent.update({item:get_top_categories_per_label(colname=item)})

Save the dictionary

import json
with open("path/most_frequent.json", "w") as file:
	json.dump(most_frequent, file)

To open it later

import json
with open("path/most_frequent.json", 'r') as file:
	most_frequent= json.load(file)

To save it as a Pandas df and then CSV,

pd.DataFrame().from_dict(most_frequent).display() #if you're on DataBricks, you can download the displayed version to your local machine as CSV

#OR
levels_df= pd.DataFrame().from_dict(most_frequent).T
levels_df.reset_index(drop=False, inplace=True)
levels_df= levels_df.rename(columns={"index":"variable", 0:"levels_count", 1:'null_percent', 2:'levels_sample'})
levels_df.display() # now you can download it to your local machine as CSV, if on DataBricks. 

Using the dictionary to have a "rare" bucket of each categorical column

df1=df
for item in most_frequent.keys():
	df1=df1\
	.withColumn(f"{item}_short", 
		f.when(f.col(item).isin(most_frequent[item]), f.col(item))
		.otherwise("rare"))
	df1= df1.drop(item)

Saving the dataframe:

df1.write.mode("overwrite").option("header", "true").format("parquet").save("path_to_save_with_file_name_you_want")

NOTES: I used f.row_number instead of f.rank to get the top n categories. Here's the difference between row number, rank and dense rank difference between row number, rank, and dense rank stackoverflow suggestiong

Crude version: Fully describing categorical variables

This version does Not take in account counts of nulls w.r.t different classes of the label variable (target variable), nor their frequency of each level within the categorical variable. Get number of levels in each variable, percentages of nulls in them, first 10 values of the distinct levels.

df.persis() # essential or it will break below

df_count= df.count()

description_dict= dict()
for item in categorical_variables:
	#levels of the categoricals, as a list
	t0=[row[item] for row in df.select(item)\
	.filter(f.col(item).isNotNull())\
	.distinct()\
	.collect()]
	
	# number of levels in the string variables
	t1= len(t0)
	
	# first 10 values that are not Null 
	t2= sorted(t0)[:10]
	
	# number of Nulls
	t3= df.filter(f.col(item).isNull()).count()
	
	description_dict.update({item: (t1, round(t3/df_count, 2), t2)})

Multi Condition Column

Suppose we have a condition on when a bill occur, and when we pay it out accoring to this schedule: first of the month, through 4th, paid on 9th 5th through 8th, paid on 10th ... 25th through end of month, paid on 1st of next month

Extracting the month of a date column

I"m goning to extract also day and year, thinking of edge cases like bill arriving on last day of the year, having to be paid next month of the next year.

df2= df\
.withColumn("day_of_bill", f.dayofmonth(f.col("bill_date")))\
.withColumn("month_of_pay", f.month(f.col("bill_date")))\
.withColumn("year_of_pay", f.year(f.col("bill_date")))

Creating the condition in SQL expression

The structure:

cond= """ CASE WHEN ... THEN ... WHEN ... THEN ... ELSE ... END """

That's how we do it in SQL. Now you apply it with f.expr() in a .withColumn() statement.

Here's how we apply the example I have above, Note that BETWEEN is inclusive of both endpoints.

cond= """CASE WHEN day_of_bill BETWEEN 1 AND 4 THEN 9 WHEN day_of_bill BETWEEN 5 AND 8 THEN 10 WHEN day_of_bill BETWEEN 9 AND 12 THEN 17 WHEN day_of_bill BETWEEN 21 AND 24 THEN 26 ELSE 1 END"""

I skipped a couple of intervals, but you get the idea. The last one: anything after the 25th paid on the first of the next month is covered in the ELSE clause.

Finally, we apply it using f.expr within a .withColumn statement df2= df1.withColumn("day_of_pay", f.expr(cond))

Fixing the issues

First, we need a condition to move us to the next month. You can add another condition that for the year if month is 12 and day_of_bill is 31

# apply to the new month of pay column
month_cond= """CASE WHEN day_of_pay ==1 THEN month_of_pay + 1 ELSE month_of_pay END """

Condition for the year end

# apply it to the new year of pay column, based on the new month of pay column, that we're adding later
year_cond= """CASE WHEN new_month_of_pay >=12 THEN year_of_pay +1 ELSE year_of_pay END"""

We end up with months that are 13, need to fix that

df2= df2\
.withColumn("new_month_of_pay", f.expr(month_cond))\
.withColumn("new_year_of_pay", f.expr(year_cond))\
.withColumn("new_mop", f.when(f.col("new_month_of_pay")>12, 1).otherwise(f.col("new_month_of_pay")))

Last updated