Create Categories/Buckets Manually, and KS test
Motivation
We want to compare the distribution of two variables. We use Kolmogorov-Smirnov test to decide if the the distributions of the two variables are the same, or statistically significantly different.
The only problem, those two variables have different sample sizes, which prevents us from immediatly doing KS test, or they're simply too large if you're in a Spark dataframe. The solution to both hurdles is to discretize the two variables by putting the values of each of those two variables in the same buckets (categories) such that those intervals now cover all possible values of both variables. Those categories now contain the count of all values within that interval, for each variable.
How to go about it?
There's a Bucketizer in Spark ML module, why we need to categorize a column ourselves? well, because the Bucketizer picks and chooses. That is, if you have values ranging from 0 to 100, and you want buckets of, say 10, but you don't have any values in, say 60's or 70's, you'll end up with buckets like this: 1, 2, 3, 4, 5, 8, 9 skipping 60's and 70's altogether, which ruins analysis. Specially when you want to test if two columns follow the same distribution with Kolmogorov-Smirnov test.
This is a template that works in all cases, covering edge cases you didn't think about in your data, like anything negative or too large. I do that by making low end $-\infty$ and high end $+\infty$
Creating the buckets
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql import Window
from scipy import stats
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import calendar
def make_categories(starting, ending, stepsize):
"""Enter all integers"""
start= starting
temp_end= ending
step= stepsize
splitz= [-float("inf")] + list(np.arange(start, ending, step)) + [flaot("inf")]
labelz= ["-inf--{}".format(splitz[1])] + ["{}--{}".format(str(v), str(splitz[n+2])) for n,v in enumerate(splitz[1:-1])]
return (splitz, labelz)
# using it
datuple= make_categories(starting=0, ending=10_000, stepsize=500) #must use this name, udf below is hardcoded
# turn it to UDF
@f.pandas_udf(returnType=StringType())
def categorize(k):
"""Maps to a Pandas series, as lambda; or to a Spark DF column, as a UDF"""
x= '0'
for n,v in enumerate(datuple[0][1:-1]):
if (k>v) & (k<=datuple[0][n+2]): #half open (a,b]
x= datuple[1][n+1]
return x
# applying it to spark dataframe
bucketed_df= df\
.withColumn("buckets", categorize(f.col("varcol")))
aggd_bucketed= bucketed_df\
.groupBy("condcol", "anothercol", "buckets")\
.agg(f.count(f.col("varcol")).alias("counts_per_bucket"))
In this case, condcol
is the column having the binary Type A and Type B of the data, i.e. 0/1
And varcol
is the variable I need to put in buckets (categorize)
And anothercol
is something else you possible want to group by as well, like dates for example, but you don't need to add it.
IMPORTANT: If you try to do bucketed_df.display()
and it errored out with ValueError ambiguous type of error, then replace f.pandas_udf
with f.udf
in line 28 in the snippet above.
This kind of "truth value ambiguous" error shows up with Pandas sometimes..
pandas_udf
was supposed to be faster than udf
in PySpark, since UDFs in PySpark are notourisouly slow, in comparision to Spark Scala's UDFs.
☞ Generally, it's advised to do frequent checks with .display()
, .show()
, or .count()
. Reason is, those force Spark to execute the transformations you've been feeding it up to that point, so if there's any hidden errors, those commands should reveal it.
Convert to Pandas to prepare to run KS test
Worth noting, in my use case here, I had a binary column that had a condition, basically if the data is, say Type A or Type B where those types are something related to the data I was dealing with. And the idea was to compare if the distribution of a certain variable, named it X, is different when data is Type A from when data is Type B. I refer to that as "two variables" to keep it in context with KS testing explanation you'd find online.
aggd_bucketed_pandas = aggd_bucketed.toPandas()
def get_pandas_all_buckets(main_df= aggd_bucketed_pandas):
"""gives a tuple: "aggregated_case1" and "aggregated_case2" in that order; which are ready for KS testing
df_cond1= main_df[main_df['condColumn']=='value1']
df_cond2= maind_df[main_df['condColumn']=='value2']
# master bucket list of the two dataframes
full_bucket_list= np.unique(
np.concatenate([df_cond1['buckets'].unique(),
df_cond2['buckets'].unique()]))
# missing from each df
missing_dfcond1= np.setdiff1d(full_bucket_list, df_cond1['buckets'].unique())
missing_dfcond2= np.setdiff1d(full_bucket_list, df_cond2['buckets'].unique())
# also, fill empty buckets with zero
res_cond1= pd.concat([df_cond1, pd.DataFrame(missing_dfcond1, columns=['buckets'])], sort=False)\
.fillna({'counts_per_bucket':0})
res_cond2= pd.concat([df_cond2, pd.DataFrame(missing_dfcond2, columns=['buckets'])], sort=False)\
.fillna({'counts_per_bucket':0})
return (res_cond1, res_cond2)
# get them
test_cond1, test_cond2= get_pandas_all_buckets(main_df= aggd_bucketed_pandas)
# if test buckets executed correctly
# tcond1 and tcond2 should be the same lengths, same index, with different values of course
tcond1= test_cond1.groupby('buckets')['counts_per_buckets'].sum()
tcond2= test_cond2.groupby('buckets')['counts_per_buckets'].sum()
You can consider the step to compute tcond1
and tcond2
redundant, but it converts the wanted buckets with their values to a sorted Pandas Series so it's easier to compare and work with.
Plotting and KS testing
KS test results will be printed on the plot
def test_and_plot(arrcond1= tcond1, arrcond2= tcond2):
arrcond1_normalized= arrcond1/sum(arrcond1)
arrcond2_normalized= arrcond2/sum(arrcond2)
counts_only_statistic, counts_only_p_value= stats.ks_2samp(arrcond1_normalized, arrcond2_normalized)
# plotting
fig, ax= plt.subplots(figsize=(9,6))
bar_width= 0.4
plt.title("over the last two years".title())
fig.suptitle("p-value={:1.3f}, KS-stat= {:1.3f}".format(counts_only_p_value, count_only_statistic), fontsize=14)
x= list(range(arrcond2_normalized.shape[0])) #both are the same length now, for both conditions
x_offset= [i + bar_width for i in x]
b1= ax.bar(x, arrcond1_normalized,
width= bar_width, label="condition 1")
ax.legend()
b2= ax.bar(x_offset, arrcond2_normalized,
width= bar_width, label="condition 2")
ax.legend()
# fix the x-axis
ax.set_xticks([i + bar_width / 2 for i in x])
ax.set_xticklabels(arrcond2_normalized.index.to_list(), rotation=90) #both have same index
ax.set_xlabel("target variable".title())
ax.set_ylabel("Probabilities")
return None
# apply it
test_and_plot()
Interpretting Results of Kolmogorov-Smirnov test
When doing 2 sample KS test, if the KS statistics is small of the p-value is high, then we can't reject the hypothesis thsat the distributions of the two samples are the same scipy docs
Resources
Numpy set UFuncs: w3schools.com/python/numpy/numpy_ufunc_set_operations.asp
Last updated
Was this helpful?