Create Categories/Buckets Manually, and KS test

Motivation

We want to compare the distribution of two variables. We use Kolmogorov-Smirnov test to decide if the the distributions of the two variables are the same, or statistically significantly different.

The only problem, those two variables have different sample sizes, which prevents us from immediatly doing KS test, or they're simply too large if you're in a Spark dataframe. The solution to both hurdles is to discretize the two variables by putting the values of each of those two variables in the same buckets (categories) such that those intervals now cover all possible values of both variables. Those categories now contain the count of all values within that interval, for each variable.

How to go about it?

There's a Bucketizer in Spark ML module, why we need to categorize a column ourselves? well, because the Bucketizer picks and chooses. That is, if you have values ranging from 0 to 100, and you want buckets of, say 10, but you don't have any values in, say 60's or 70's, you'll end up with buckets like this: 1, 2, 3, 4, 5, 8, 9 skipping 60's and 70's altogether, which ruins analysis. Specially when you want to test if two columns follow the same distribution with Kolmogorov-Smirnov test.

This is a template that works in all cases, covering edge cases you didn't think about in your data, like anything negative or too large. I do that by making low end $-\infty$ and high end $+\infty$

Creating the buckets

from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql import Window

from scipy import stats
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import calendar

def make_categories(starting, ending, stepsize):
  """Enter all integers"""
	
	start= starting
	temp_end= ending
	step= stepsize
	
	splitz= [-float("inf")] + list(np.arange(start, ending, step)) + [flaot("inf")]
	labelz= ["-inf--{}".format(splitz[1])] + ["{}--{}".format(str(v), str(splitz[n+2])) for n,v in enumerate(splitz[1:-1])]
	
	return (splitz, labelz)
	
# using it
datuple= make_categories(starting=0, ending=10_000, stepsize=500) #must use this name, udf below is hardcoded

# turn it to UDF
@f.pandas_udf(returnType=StringType())
def categorize(k):
  """Maps to a Pandas series, as lambda; or to a Spark DF column, as a UDF"""
	
	x= '0'
	for n,v in enumerate(datuple[0][1:-1]):
	  if (k>v) & (k<=datuple[0][n+2]): #half open (a,b]
		  x= datuple[1][n+1]
	return x

# applying it to spark dataframe
bucketed_df= df\
.withColumn("buckets", categorize(f.col("varcol")))

aggd_bucketed= bucketed_df\
.groupBy("condcol", "anothercol", "buckets")\
.agg(f.count(f.col("varcol")).alias("counts_per_bucket"))

In this case, condcol is the column having the binary Type A and Type B of the data, i.e. 0/1 And varcol is the variable I need to put in buckets (categorize) And anothercol is something else you possible want to group by as well, like dates for example, but you don't need to add it.

IMPORTANT: If you try to do bucketed_df.display() and it errored out with ValueError ambiguous type of error, then replace f.pandas_udf with f.udf in line 28 in the snippet above.

This kind of "truth value ambiguous" error shows up with Pandas sometimes.. pandas_udf was supposed to be faster than udf in PySpark, since UDFs in PySpark are notourisouly slow, in comparision to Spark Scala's UDFs.

☞ Generally, it's advised to do frequent checks with .display(), .show(), or .count(). Reason is, those force Spark to execute the transformations you've been feeding it up to that point, so if there's any hidden errors, those commands should reveal it.

Convert to Pandas to prepare to run KS test

Worth noting, in my use case here, I had a binary column that had a condition, basically if the data is, say Type A or Type B where those types are something related to the data I was dealing with. And the idea was to compare if the distribution of a certain variable, named it X, is different when data is Type A from when data is Type B. I refer to that as "two variables" to keep it in context with KS testing explanation you'd find online.

aggd_bucketed_pandas = aggd_bucketed.toPandas()

def get_pandas_all_buckets(main_df= aggd_bucketed_pandas):
  """gives a tuple: "aggregated_case1" and "aggregated_case2" in that order; which are ready for KS testing
	
	df_cond1= main_df[main_df['condColumn']=='value1']
	df_cond2= maind_df[main_df['condColumn']=='value2']
	
	# master bucket list of the two dataframes
	full_bucket_list= np.unique(
	np.concatenate([df_cond1['buckets'].unique(), 
	df_cond2['buckets'].unique()]))
	
	# missing from each df
	missing_dfcond1= np.setdiff1d(full_bucket_list, df_cond1['buckets'].unique())
	missing_dfcond2= np.setdiff1d(full_bucket_list, df_cond2['buckets'].unique())
	
	# also, fill empty buckets with zero
	res_cond1= pd.concat([df_cond1, pd.DataFrame(missing_dfcond1, columns=['buckets'])], sort=False)\
	.fillna({'counts_per_bucket':0})
	
	res_cond2= pd.concat([df_cond2, pd.DataFrame(missing_dfcond2, columns=['buckets'])], sort=False)\
	.fillna({'counts_per_bucket':0})
	
	return (res_cond1, res_cond2)
	
# get them
test_cond1, test_cond2= get_pandas_all_buckets(main_df= aggd_bucketed_pandas)

# if test buckets executed correctly
# tcond1 and tcond2 should be the same lengths, same index, with different values of course
tcond1= test_cond1.groupby('buckets')['counts_per_buckets'].sum()
tcond2= test_cond2.groupby('buckets')['counts_per_buckets'].sum()

You can consider the step to compute tcond1 and tcond2 redundant, but it converts the wanted buckets with their values to a sorted Pandas Series so it's easier to compare and work with.

Plotting and KS testing

KS test results will be printed on the plot

def test_and_plot(arrcond1= tcond1, arrcond2= tcond2):
  arrcond1_normalized= arrcond1/sum(arrcond1)
	arrcond2_normalized= arrcond2/sum(arrcond2)
	
	counts_only_statistic, counts_only_p_value= stats.ks_2samp(arrcond1_normalized, arrcond2_normalized)
	
	# plotting
	fig, ax= plt.subplots(figsize=(9,6))
	bar_width= 0.4
	plt.title("over the last two years".title())
	fig.suptitle("p-value={:1.3f}, KS-stat= {:1.3f}".format(counts_only_p_value, count_only_statistic), fontsize=14)
	
	x= list(range(arrcond2_normalized.shape[0])) #both are the same length now, for both conditions
	x_offset= [i + bar_width for i in x]
	
	b1= ax.bar(x, arrcond1_normalized, 
	          width= bar_width, label="condition 1")
	ax.legend()
	
	b2= ax.bar(x_offset, arrcond2_normalized, 
	           width= bar_width, label="condition 2")
	ax.legend()
	
	# fix the x-axis
	ax.set_xticks([i + bar_width / 2 for i in x])
	ax.set_xticklabels(arrcond2_normalized.index.to_list(), rotation=90) #both have same index
	ax.set_xlabel("target variable".title())
	ax.set_ylabel("Probabilities")
	
	return None
	
# apply it
test_and_plot()

Interpretting Results of Kolmogorov-Smirnov test

When doing 2 sample KS test, if the KS statistics is small of the p-value is high, then we can't reject the hypothesis thsat the distributions of the two samples are the same scipy docs

Resources

Numpy set UFuncs: w3schools.com/python/numpy/numpy_ufunc_set_operations.asp

Numpy combining and reshaping

grouped data charts

Last updated