CountVectorizer to one-hot encode multiple columns at once

Binarize multiple columns at once. Particularly useful if you want to count, for each categorical column, how many time each category occurred per a partition; e.g. partition by customer ID

Motivation

We have categorical variables, per customer per transaction. Those categorical variables can vary per transaction. For example, customer 1234 uses their credit card to purchase from different locations and retail category, and we want to count how many of each location and how many of each retail category they purchased from in the past year. Or, if you have a member with different diagnosis codes per medical claim. We want to know how many different conditions this member has seen a medical professional for in the past year or over their membership term with the insurance company.

Steps

Starting with the list of columns we want to count, how many categories of it exist in each group (a group can be just one column like customer id, or multiple columns).

  1. If you have any numerical variables you want to treat as strings, in case categories were entered as numbers, convert those columns into strings now; using the .f.col().cast("string") function in PySpark. This is necessary as CountVectorizer requires an array of strings.

  2. Fill nulls of all columns you want to count with a distinct value, like "NULL_VALUE" or similar. Also a necessary step, because CountVectorizer requires that the list have at least one element in it, if all were nulls, you'll run into problems.

  3. All in one step, do the following,

    • group by the columns you want to group by e.g. customer id

    • with f.collect_list(), aggregate all the categorical columns you need

    • for the numerical ones that you still want to see in the result table, aggregate either with f.sum() or f.count()

    • although bad because it pushes everyting to the driver node, we're going to run a for-loop since CountVectorizer didn't have an option inputCols in older PySpark, but only inputCol so I had to do it one by one in a loop, I hope they fix it in the future

  4. create a pipeline with VectorAssembler and PCA to create the features column, then use it in whatever ML model you like.

NOTES

  • I'm borrowing this technique from NLP, so it makes sense why Spark developers didn't think about adding multiple columns to CountVectorizer, as there's usually one column for the corpora you want to count.

  • also we it's customary in NLP, you need a way to reduce the dimension of your dataframe after CountVectorizer, I used PCA

  • you can also use OneHotEncode for our purpose here then aggregate each binary column by summing to get the counts of each category in each categorical column, but, if you have at least one categorical column with too many levels, I have over 25000 different categories in one of them, it become much more resource extensive, as you'll be basically breaking down what CountVectorizer does. You sill need a dimension reduction method in either case.

  • you can try, I haven't, to combine all the categorical variables first in one corpora-like column, using f.concat_ws() function, making your separater a space, then feed it to CountVectorizer. However, make sure you groupBy your grouping columns first because if you want to f.collect_list() on that strings-combination array column you'll end up with a list of lists; unnecessarily complicating things.

def count_levels(df= df):
	"""there are static variables inside this function, can be fed as arguments to function, or defined before as named here. Also, I have two target labels, I want to count by that as well"""
	local_df= df
	
	to_agg= [f.collect_list(f.col(item)).alias(f"collected_{item}") for item in categorical_columns] + [f.sum(f.col(item)).alias(item) for item in columns_to_sum] + [f.count(f.col(item)).alias(item) for item in columns_to_count]
	
	local_df= local_df\
	.groupBy(grouping_columns)\
	.agg(*to_agg)
	
	for item in categorical_columns:
		# initialize CountVectorizer
		from pyspark.ml.feature import CountVectorizer
		count_vec= CountVectorizer(inputCol=f"collected_{item}", 
		outputCol= f"CVed_{item}", 
		binary=False,
		minDF=1)
		
		local_df= count_vec.fit(local_df).transform(local_df)
		local_df= local_df.drop(f"collected_{item}")
		
	return local_df

Extra - Category count/percent per target label

I had a little extra, in the use-case I had, the target variable had two classes, and I wanted to count categories per class, to see if there's a category has more presence or sole presence in one class over the other. I repeated that for all categorical columns. Below are the additions to the function I had above to accomodate that.

def get_counts_per_label(colname,df):
	"""results are used in `count_levels()` function below. I define my global variables as all caps in the beginning of the notebook, and just keep plugging them throughout the notebook"""
	
	tup= df\
	.select(colname, TARGET_VARIABLE)\
	.groupBy(TARGET_VARIABLE)\
	.agg(f.count(f.col(colname)).alias('count'))\
	.collect()
	
	count0= [row['count'] for row in tup if row[TARGET_VARIABLE]==0][0]
	count1= [row['count'] for row in tup if row[TARGET_VARIABLE]==1][0]
	
	return count0, count1

Because I'm using .collect() the function could take a long time to execute and be expensive, depending on the size of the data you're handling, and the size of your Spark cluster. I recommend nothing less than 4TB total memory with 4TB storage for reasonably sized dataframes for daily data science discovery and testing work, but I use a 16TB total memory with 16TB total storage for the actual feature engineering, data preparation, and modeling. All of that to say, you can run get_counts_per_label() function inside the counting categories function, or you can run it before and use results in it, to break down the process.

def count_levels(df, categorical_columns):
	count0, count1= get_counts_per_label(colname=TARGET_VARIABLE, df=df)
	
	local_df=df
	
	to_agg= [f.collect_list(f.col(item)).alias(f"collected_{item}") for item in categorical_columns] + [f.sum(f.col(item)).alias(item) for item in columns_to_sum] + [f.count(f.col(item)).alias(item) for item in columns_to_count] + [f.count(f.col(TARGET_COLUMN)).alias("target_count")]
	
	local_df= local_df\
	.groupBy(grouping_columns + [TARGET_COLUMN])\
	.agg(*to_agg)\
	.withColumn("label_percent", 
		f.when(f.col(TARGET_COLUMN)==0, f.col("target_count")/f.lit(count0)).
		when(f.col(TARGET_COLUMN)==1, f.col("target_count")/f.lit(count1)).
		otherwise(None))
		
	for item in categorical_columns:
		# initialize CountVectorizer
		from pyspark.ml.feature import CountVectorizer
		count_vec= CountVectorizer(inputCol=f"collected_{item}", 
		outputCol= f"CVed_{item}", 
		binary=False,
		minDF=1)
		
		local_df= count_vec.fit(local_df).transform(local_df)
		local_df= local_df.drop(f"collected_{item}")
		
	return local_df

Last updated