Operations on Multiple Columns at Once
Applying the same transformation function on multiple columns at once in PySpark. Or applying different aggregation functions for different columns at once.
Applying Same Function to Multiple Columns
Example: To lower-case column names:
To lower-case rows values, use the f.lower()
function from PySpark. You can also use .select statement with mapping as shown in the StackOverflow answer below.
Resources: lower values in each row, but not column names performing operations on multiple columns in a Pyspark datafarme - Medium
Change Type of Multiple Columns at Once
source: change types of multiple columns - Stackoverflow
Aggregating Different Columns w.r.t. Different Functions
First, create a list of the aggregation procedures you want for each group of columns
Notes
The select statement is not necessary, but it is good pracrice to optimize your query. That is, you will be passing smaller dataframe and making operations faster. In fact, if your dataframe is too big, whether in row count or column count, the operation will fail altogether if a select clause isn't used.
the asterisk you see whenever we pass a list to a function, like select or agg, is needed such that Spark understands to operate on element by element from the list. It won't take the list as an object and understands to read the elements on its own. Similar to
:_*
in Scala.
Sources and other ways to do it blogspot - aggregate multiple columns with multiple functions pyspark
Another Way, the quick and dirty
The following syntax make it easy to aggregate one column with different aggregation functions, but it doesn't allow to alias, which might create problems later on if you want to do further analysis on it or referencing it; reason being the name of outcome column would be of the format function(columnname)
e.g. count(col1)
where cols
is a list of columns you want to groupBy.
CountDistinct example
PySpark syntax
Spark Scala syntax
Note:
adding a select clause before grouping or aggregation is always a good idea when you have a big dataframe, whether it's big in row count or column count. Lowering the size of the data to process will speed up your calculations. StackOverflow post
Counting distinct values for multiple columns, and getting those distinct values all at once.
Summing horizontally, i.e. summing row-wise
That is, summing one row across multiple columns. e.g. counting sales or medial diagnosis for one row in the dataframe.
The core of it:
The algorithm,
binarize columns. that is, if null make zero, if there's a value other than null, make it a 1
combine the 1/0 values from all binarized columns into one ArrayType column you create
sum up the elements of that ArrayType column, thus finding count of existance of a value in each field. You can also skip the binarizing step of course if you want to just sum up the actual values in each column.
NOTES
cond1 is a SQL expression. the formula inside it is explained in Scala section of this book.
I add backslash to the end of PySpark commands so I can continue command on the next line. To break it up and make it easier to read.
the asterisk
*
tells the Spark function to operate element-wise on the list fed to it. Similar to:_*
in Scalayou can choose to put elements in a list first, with
f.collect_list(f.col("colname"))
then dof.size()
on the column you created, but in this case, null values, and spaces insead of nulls will give you a headache as they would count. You can try circumventing that by replacing nulls with a distinct value before you create the array column, then usef.array_remove()
on the newly created array column from collect_list or collect_set (distinct values) then use f.size. Experiment with that, I haven't successfully done it this way. Search online for "removing null values from an array column Spark". Someone suggesteddf.withColumn('vec_comb_clean', f.expr("FILTER(vec_comb_clean), x-> x IS NOT NULL"))
though that didn't work as intended in my case for some reason; but it's more efficient if it did. Try it and verify on your own.
Categorical Variables Exploration
A very inefficient and cumbersome way to get categories of each column per class of label column, plus the top 10 categories, and the percentage of nulls of each categorical column w.r.t. each label
since this function performs heavy process, I'll do it outside the function below, and feed its result to the next function. count0, count1= get_counts_per_label(colname=colname, df=df, labelcol= "label_column")
Next, create the dictionary of columns and their most frequent levels, accounting for both target labels individually.
Save the dictionary
To open it later
To save it as a Pandas df and then CSV,
Using the dictionary to have a "rare" bucket of each categorical column
Saving the dataframe:
NOTES: I used f.row_number
instead of f.rank
to get the top n categories. Here's the difference between row number, rank and dense rank difference between row number, rank, and dense rank stackoverflow suggestiong
Crude version: Fully describing categorical variables
This version does Not take in account counts of nulls w.r.t different classes of the label variable (target variable), nor their frequency of each level within the categorical variable. Get number of levels in each variable, percentages of nulls in them, first 10 values of the distinct levels.
Multi Condition Column
Suppose we have a condition on when a bill occur, and when we pay it out accoring to this schedule: first of the month, through 4th, paid on 9th 5th through 8th, paid on 10th ... 25th through end of month, paid on 1st of next month
Extracting the month of a date column
I"m goning to extract also day and year, thinking of edge cases like bill arriving on last day of the year, having to be paid next month of the next year.
Creating the condition in SQL expression
The structure:
That's how we do it in SQL. Now you apply it with f.expr()
in a .withColumn() statement.
Here's how we apply the example I have above, Note that BETWEEN
is inclusive of both endpoints.
I skipped a couple of intervals, but you get the idea. The last one: anything after the 25th paid on the first of the next month is covered in the ELSE clause.
Finally, we apply it using f.expr within a .withColumn statement df2= df1.withColumn("day_of_pay", f.expr(cond))
Fixing the issues
First, we need a condition to move us to the next month. You can add another condition that for the year if month is 12 and day_of_bill
is 31
Condition for the year end
We end up with months that are 13, need to fix that
Last updated