Correlations in PySpark & Selecting Variables Based on That Correlation Threshold

Linear correlations are an important part of the EDA step. This page shows you how to compute Pearson and Spearman correlations in PySpark to make sense of them; in addition to quickly isolating those columns with linear correlation value greater than a certain threshold we want.

Correlations in PySpark

from pyspark.ml.stat import Correlation

pearson= Correlation.corr(df, 'features', 'pearson').collect()[0][0]

print(str(pearson).reaplace("nan", "NaN"))

np.savetxt("path/pearsonCorrMatrix.csv", pearson.toArray(), delimiter=',')

spearman= Correlation.corr(df, 'features', method='spearman').collect()[0][0]
np.savetxt("path/spearmanCorrMatrix.csv", spearman.toArray(), delimiter=',')
# loading already saved correlation matricies
spearmanCorrMatrix= np.loadtxt("path/spearmanCorrMatrix.csv", delimiter=',')
pearsonCorrMatrix= np.loadtxt("path/pearsonCorrMatrix.csv", delimiter=',')

Covert correlations dataframes to Pandas dataframes, and take absolute values of correlations

pearson_df= pd.DataFrame(pearsonCorrMatrix, 
columns=df.columns, 
index=df.columns).applymap(lambda x: abs(x))

spearman_df= pd.DataFrame(spearmanCorrMatrix, 
columns=df.columns, 
index=df.columns).applymap(lambda x: abs(x))

Select values with a specific threshold i.e. with chosen level of correlation value

The technique below works when selecting values in a correlation dataframe that are above a certain correlation value; then returns it as a list of tuples, such that we can convert the dataframe back into a Spark df.

def get_important_corrs(corr_df, importance=0.1):
	corr_triu= corr_df.where(~np.tril(np.ones(corr_df.shape)).astype(np.bool))
	corr_triu = corr_triu.stack()
	corr_triu_frame= corr_triu[corr_triu >= importance].to_frame()
	
	tuples_list= corr_triu_frame.index.to_list()
	
	return tuples_list

This function can be used not only with correlations, but also to select values from any Pandas dataframe with respect to a specific numeric variable/column.

With a slight modification, you can also combine it with the aggregated countDistinct function from Spark, to pick the categorical columns having more than a certain number of categories/levels in them; say to drop them later or other treatment, like finding a higher level of them to roll them up to, sometimes simply bucketing into frequent and rare categories. Modify filter condition below to >= or <= in the function below.

levelsdf= df\
.agg(*[f.countDistinct(f.col(item)).alias(item) for item in categorical_columns_list])\
.toPandas()

Then select those columns with more than 100 levels, for example.

def get_important_corrs(corr_df, count=100):
	corr_triu= corr_df.where(~np.tril(np.ones(corr_df.shape)).astype(np.bool))
	
	corr_triu= corr_triu.stack()
	
	corr_triu_frame= corr_triu[corr_triu >= count].to_frame()
	
	# tuples_list= corr_triu_frame.index.to_list()
	
	return corr_triu_frame

Last updated