Modeling in PySpark as a Function for Faster Testing

Wrapping the model in a function

Just wrapping a model in a function taking input data, and arguments to control the hyper parameters of the model. It spits out testing results and return the model object. It's useful for faster testing and manual hyperparameters tuning.

import pyspark.sql.functions as f
from pyspark.sql.types import *

train_set, test_set= df.randomSplit([.75,.25], seed=RANDOM_SEED_VALUE)

def lr_models(train_set= train_set, test_set=test_set, el=0.0, reg=0.0):
	from pyspark.ml.regression import LinearRegression
	from pyspark.ml.evaluation import RegressionEvaluator
	
	lr= LinearRegression(featuresCol='features', 
	labelCol=TARGET_VARIABLE, 
	predictionCol='lr_prediction', 
	elasticNetParam= el, 
	regParam= reg, 
	standardization=False)
	
	#train
	lr_model= lr.fit(train_set)
	
	#predict on test; and train to check for overfitting
	preds= lr_model.transform(test_set)
	train_preds= lr_model.transform(train_set)
	
	# score, with MSE
	re_mse= RegressionEvaluator(predictionCol='lr_prediction', 
	labelCol=TARGET_VARIABLE, 
	metricName='mse')
	# you can add another metric, r-squared for instance
	re_r2= RegressionEvaluator(predictionCol='lr_prediction', 
	labelCol=TARGET_VARIABLE, 
	metricName='r2')
	
	print("R-Squared for Testing Set: ", re_r2.evaluate(preds))
	print("R-Squared for Training Set: ", re_r2.evaluate(train_preds))
	print("***********")
	print("Mean Squared Errors for Testing Set: ", re_mse.evaluate(preds))
	print("Mean Squared Errors for Training Set: ", re_mse.evaluate(train_preds))
	
	return lr_model

Applying it with various ElasticNet and Regression parameter values, lr_033_05= lr_models(el=.033, reg=.5) Following the math of ElasticNet formula, if you get its parameter close to 1/3 then it's like you almost equate the two factors of Ridge and Lasso.

Checking Errors

It's essential to check errors in regression, you want to make sure there's no heteroscedasticity or other issues with them. Since regression errors metrics are merely average of them, it's necessary to plot the errors to see for yourself where they stand, the range, and shape of errors. You can run a summary on them with Spark,

predictions= lr_033_05.transform(holdout_set)
predictions\
.withColumn("errors", )

Extras

Predicting on holdout set, w.r.t. target variable classes, for some column of interest

Predicting is done with .transform() command on the model object; as shown in modeling function above.

predictions= lr_033_05.transform(holdout_set)

# col1 is the column of interest, a continuous variable
results_df= predictions\
.groupBy(grouping_columns)\
.agg(*[f.sum(f.col(c)).alias(c) for c in ["lr_prediction", "col1"]])\
.withColumn("target_ratio", f.col('lr_prediction')/f.col('col1'))\
.drop('pred_sum', 'colname_sum')

If ther's an interesting continuous column, find ratio between actual and predicted

Suppose, further, I want to see how the column of interest col1 is behaving between actual values of the target variable, and those predicted from the model.

def checkup_wrt_type(preds_df= predictions):
	"""`col1` in code below is the interesting column, a continuous one.
	`col2` is a categorical variable of importance"""
	one= preds_df\
	.groupBy('col2')\
	.agg(*[f.sum(f.col(c)).alias(f"sum_{c}") for c in ['lr_prediction', 'col1']])\
	.withColumn("target_ratio_Prediction", 
	f.col("sum_lr_prediction")/f.col(f.col('sum_col1')))\
	.drop('sum_lr_prediction', 'sum_col1')
	
	two= preds_df\
	.groupBy('col2')\
	.agg(*[f.sum(f.col(c)).alias(f"sum_{c}") for c in [TARGET_VARIABLE, 'col1']])\
	.withColumn("target_ratio_Actual", 
	f.col(f"sum_{TARGET_VARIABLE}")/f.col(f.col('sum_col1')) )\
	.drop(f"sum_{TARGET_VARIABLE}", 'sum_col1')
	
	joined_df= one.join(two, on='col2')\
	.withColumn("ratio_Predicted_to_Actual", 
	f.bround(f.col("target_ratio_Prediction",)/f.col("target_ratio_Actual") ,3))

	return joined_df

NOTE: don't ever use f.bround() function while you're doing processes on columns; because that skews results! I use it only the last result and only for display and reporting purposes.

Last updated