Applying the same transformation function on multiple columns at once in PySpark. Or applying different aggregation functions for different columns at once.
Applying Same Function to Multiple Columns
Example: To lower-case column names:
df= df.select(*[f.lower(f.col(c)).alias(c) for c in df.columns])
# or
lowcase_cols= [item.lower() for item in df.columns]
df= df.toDF(*lowcase_cols)
To lower-case rows values, use the f.lower() function from PySpark. You can also use .select statement with mapping as shown in the StackOverflow answer below.
Resources:
Change Type of Multiple Columns at Once
# see how many columns we have of each type in the df
from collections import Counter
Counter([col[1] for col in df.dtypes])
# convert all decimal type to doubletype
decimal_cols= [str(col[0]) for col in df.dtypes if col[1].startswith("decimal")]
# df.dtypes returns a tuple: (colname, type)
for c in decimal_cols:
df= df.withColumn(c, f.col(c).cast("double"))
# make sure it worked
print(Counter([col[1] for col in df.dtypes]))
Aggregating Different Columns w.r.t. Different Functions
First, create a list of the aggregation procedures you want for each group of columns
# have a list for what you need to do to each column
cols_to_sum= ["col1", "col2", "col3"]
cols_to_count= ["col4", "col5"]
cols_to_max= ["col6"]
# create a list of those aggregation procedures
aggs= [f.sum(f.col(c)).alias(c) for c in cols_to_sum] + [f.count(f.col(c)).alias(c) for c in cols_to_count] + [f.max(f.col(c)).alias(c) for c in cols_to_max]
# put the aggregations list in the statement
aggd_df= df\
.select(*(["id_column"] + cols_to_sum + cols_to_count + cols_to_max))\
.groupBy("id_column")\
.agg(*aggs)
Notes
The select statement is not necessary, but it is good pracrice to optimize your query. That is, you will be passing smaller dataframe and making operations faster. In fact, if your dataframe is too big, whether in row count or column count, the operation will fail altogether if a select clause isn't used.
the asterisk you see whenever we pass a list to a function, like select or agg, is needed such that Spark understands to operate on element by element from the list. It won't take the list as an object and understands to read the elements on its own. Similar to :_* in Scala.
Another Way, the quick and dirty
The following syntax make it easy to aggregate one column with different aggregation functions, but it doesn't allow to alias, which might create problems later on if you want to do further analysis on it or referencing it; reason being the name of outcome column would be of the format function(columnname) e.g. count(col1)
aggs= [f.max, f.min, f.avg, f.stddev, f.count]
res_columns= [v("colname") for v in aggs]
df1= df\
.groupBy(*cols).agg(*res_columns)\
where cols is a list of columns you want to groupBy.
CountDistinct example
PySpark syntax
cols_to_select= ["col1","col2", "col3"]
df\
.select(*cols_to_select)\
.agg(*(f.countDistinct(f.col(item)).alias(item) for item in cols_to_select))
Spark Scala syntax
val exprs= df.columns.map((_ -> "approx_count_distinct")).toMap
df.agg(exprs).show
Note:
Counting distinct values for multiple columns, and getting those distinct values all at once.
df\
.select(*list_of_columns)\
.agg(*[f.collect_set(f.col(item)) for item in list_of_columns],
*[f.countDistinct(f.col(item)) for item in list_of_columns])\
.display()
Summing horizontally, i.e. summing row-wise
That is, summing one row across multiple columns. e.g. counting sales or medial diagnosis for one row in the dataframe.
The core of it:
import pyspark.sql.functions as f
summing_cond="AGGREGATE(arraycolname, 0 (x,y) -> x+y)"
df=df.withColumn("arraycolSum", f.expr(summing_cond))
The algorithm,
binarize columns. that is, if null make zero, if there's a value other than null, make it a 1
combine the 1/0 values from all binarized columns into one ArrayType column you create
sum up the elements of that ArrayType column, thus finding count of existance of a value in each field. You can also skip the binarizing step of course if you want to just sum up the actual values in each column.
import pyspark.sql.functions as f
from pyspark.sql.types import *
cols_to_sum_horizontally= [...]
# if null:0, else 1
for item in cols_to_sum_horizontally:
df= df\
.withColumn("{}_int".format(item),
(f.when(f.col(item).isNotNull(), 1).otherwise(0)).cast("int") )\
.drop(item)
# create an array column of the 1/0 value
df= df\
.withColumn("sales_", f.array(*[k for k in df.columns if k.endswith("_int")]))
# sum the 1/0 array column, to calculate count
cond1= "AGGREGATE(sales_, 0, (x,y) -> x+y)"
df= df\
.withColumn("sales_count", f.expr(cond1))
NOTES
cond1 is a SQL expression. the formula inside it is explained in Scala section of this book.
I add backslash to the end of PySpark commands so I can continue command on the next line. To break it up and make it easier to read.
the asterisk * tells the Spark function to operate element-wise on the list fed to it. Similar to :_* in Scala
you can choose to put elements in a list first, with f.collect_list(f.col("colname")) then do f.size() on the column you created, but in this case, null values, and spaces insead of nulls will give you a headache as they would count. You can try circumventing that by replacing nulls with a distinct value before you create the array column, then use f.array_remove() on the newly created array column from collect_list or collect_set (distinct values) then use f.size. Experiment with that, I haven't successfully done it this way. Search online for "removing null values from an array column Spark". Someone suggested df.withColumn('vec_comb_clean', f.expr("FILTER(vec_comb_clean), x-> x IS NOT NULL")) though that didn't work as intended in my case for some reason; but it's more efficient if it did. Try it and verify on your own.
Categorical Variables Exploration
A very inefficient and cumbersome way to get categories of each column per class of label column, plus the top 10 categories, and the percentage of nulls of each categorical column w.r.t. each label
def get_counts_per_label(colname, df, labelcol="label_column"):
"""used internally in the next function"""
tup= df\
.select(colname, labelcol)\
.groupBy(labelcol)\
.agg(f.count(f.col(colname)).alias('count'))\
.collect()
count0= [row['count'] for row in tup if row[labelcol]==0][0]
count1= [row['count'] for row in tup if row[labelcol]==1][0]
return count0, count1
since this function performs heavy process, I'll do it outside the function below, and feed its result to the next function. count0, count1= get_counts_per_label(colname=colname, df=df, labelcol= "label_column")
def get_top_n_per_label(colname, df, count0, count1, top=10):
"""plus cumulative sum of the top n or else ratios, per label results returned will be in order. `top` is the number of top categories wanted.
returns a dictionary {colname: (cumu_sum for class0, cumu_sum for class1, top n categories)}"""
from pyspark.sql.window import Window
win1= Window.partitionBy(labelcol).orderBy(f.col('ratio').desc())
temp= df\
.select(colname, labelcol)\
.groupBy(colname, labelcol)\
.agg(f.count(f.col(colname)).alias("count"))\
.orderBy(labelcol, f.col("count").desc())\
.withColumn('ratio',
f.when(f.col(labelcol)==0, f.col("count")/count0)
.when(f.col(labelcol)==1), f.col("count")/count1)
.otherwise(None) )
temp1=temp\
.select('*', f.row_number().over(win1).alias("row_number"))\
.drop("count")\
.filter(f.col("row_number")<=top)\
.withColumn("cumu_sum", f.sum(f.col("ratio")).over(win1))
temp2=temp1.collect()
top_n_categories= list(set([row[colname] for row in temp2]))
temp3_0=[round(row['cumu_sum'], 3) for row in temp2 if (row[labelcol]==0) and (row['row_number']==top)][0]
temp3_1=[round(row['cumu_sum'], 3) for row in temp2 if (row[labelcol]==1) and (row['row_number']==top)][0]
summ_dict= {colname: (temp3_0, temp3_1, top_n_categories)}
return summ_dict
Next, create the dictionary of columns and their most frequent levels, accounting for both target labels individually.
most_frequent=dict()
for item in categorical_columns:
most_frequent.update({item:get_top_categories_per_label(colname=item)})
Save the dictionary
import json
with open("path/most_frequent.json", "w") as file:
json.dump(most_frequent, file)
To open it later
import json
with open("path/most_frequent.json", 'r') as file:
most_frequent= json.load(file)
To save it as a Pandas df and then CSV,
pd.DataFrame().from_dict(most_frequent).display() #if you're on DataBricks, you can download the displayed version to your local machine as CSV
#OR
levels_df= pd.DataFrame().from_dict(most_frequent).T
levels_df.reset_index(drop=False, inplace=True)
levels_df= levels_df.rename(columns={"index":"variable", 0:"levels_count", 1:'null_percent', 2:'levels_sample'})
levels_df.display() # now you can download it to your local machine as CSV, if on DataBricks.
Using the dictionary to have a "rare" bucket of each categorical column
df1=df
for item in most_frequent.keys():
df1=df1\
.withColumn(f"{item}_short",
f.when(f.col(item).isin(most_frequent[item]), f.col(item))
.otherwise("rare"))
df1= df1.drop(item)
This version does Not take in account counts of nulls w.r.t different classes of the label variable (target variable), nor their frequency of each level within the categorical variable. Get number of levels in each variable, percentages of nulls in them, first 10 values of the distinct levels.
df.persis() # essential or it will break below
df_count= df.count()
description_dict= dict()
for item in categorical_variables:
#levels of the categoricals, as a list
t0=[row[item] for row in df.select(item)\
.filter(f.col(item).isNotNull())\
.distinct()\
.collect()]
# number of levels in the string variables
t1= len(t0)
# first 10 values that are not Null
t2= sorted(t0)[:10]
# number of Nulls
t3= df.filter(f.col(item).isNull()).count()
description_dict.update({item: (t1, round(t3/df_count, 2), t2)})
Multi Condition Column
Suppose we have a condition on when a bill occur, and when we pay it out accoring to this schedule: first of the month, through 4th, paid on 9th 5th through 8th, paid on 10th ... 25th through end of month, paid on 1st of next month
Extracting the month of a date column
I"m goning to extract also day and year, thinking of edge cases like bill arriving on last day of the year, having to be paid next month of the next year.
cond= """ CASE WHEN ... THEN ... WHEN ... THEN ... ELSE ... END """
That's how we do it in SQL. Now you apply it with f.expr() in a .withColumn() statement.
Here's how we apply the example I have above, Note that BETWEEN is inclusive of both endpoints.
cond= """CASE WHEN day_of_bill BETWEEN 1 AND 4 THEN 9 WHEN day_of_bill BETWEEN 5 AND 8 THEN 10 WHEN day_of_bill BETWEEN 9 AND 12 THEN 17 WHEN day_of_bill BETWEEN 21 AND 24 THEN 26 ELSE 1 END"""
I skipped a couple of intervals, but you get the idea. The last one: anything after the 25th paid on the first of the next month is covered in the ELSE clause.
Finally, we apply it using f.expr within a .withColumn statement df2= df1.withColumn("day_of_pay", f.expr(cond))
Fixing the issues
First, we need a condition to move us to the next month. You can add another condition that for the year if month is 12 and day_of_bill is 31
# apply to the new month of pay column
month_cond= """CASE WHEN day_of_pay ==1 THEN month_of_pay + 1 ELSE month_of_pay END """
Condition for the year end
# apply it to the new year of pay column, based on the new month of pay column, that we're adding later
year_cond= """CASE WHEN new_month_of_pay >=12 THEN year_of_pay +1 ELSE year_of_pay END"""
We end up with months that are 13, need to fix that
adding a select clause before grouping or aggregation is always a good idea when you have a big dataframe, whether it's big in row count or column count. Lowering the size of the data to process will speed up your calculations.
NOTES: I used f.row_number instead of f.rank to get the top n categories. Here's the difference between row number, rank and dense rank