Cross Validation in Spark

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#global variables
RANDOM_SEED_VALUE= 42
TARGET_VARIABLE= 'target_column'
ID_COLUMN= 'id_column'

# import, clean, prepare your data here
# don't forget you need to do the exact same thing to the data you will predict on!

feats=[item for item in clean_df.columns if item not in [TARGET_VARIABLE, ID_COLUMN]] # can be a tuple of (TARGET_VARIABLE, ID_COLUMN) as well

assembler= VectorAssembler(inputCols=feats, outputCol="features", handleInvalid='keep')

df= assembler.transform(clean_df)

# train/test split
reg_train, reg_test= df.randomSplit([.67, .33], seed= RAMDOM_SEED_VALUE)

# instantiating the model
gbtr= GBTRegressor(labelCol=TARGET_VARIABLE, 
predictionCol= "prediction_gbtr", seed=RANDOM_SEED_VALUE)
gbtr.setFeaturesCol("features")

# create the parameter grid for cross validation
paramgrid= (ParamGridBuilder()
.addGrid(gbtr.maxDepth, [5,3])
.addGrid(gbtr.stepSize, [0.01, 0.5, 0.1])
.addGrid(gbtr.minInfoGain, [0.01, 0.1])
.build())
# the outer parantheses here only to make Python accept new line code parts, to make it easier to read

# create the regression evaluator for metrics
re= RegressionEvaluator(predictionCol='prediction_gbtr',
labelCol=TARGET_VARIABLE, 
metricName='rmse')

# cross validator
cv= CrossValidator(estimator=gbtr, 
estimatorParamMaps= paramgrid, 
evaluator= re, 
numFolds=5, 
seed= RANDOM_SEED_VALUE, 
parallelism= 64*2, 
collectSubModels=False)

cv_model= cv.fit(df)

Notes

  • Tune the parallelism parameter in CV however fits your cluster. I had 256GB memory and 64 cores per node on mine, so I gave two threads to each core.

  • MLFLow will automatically track the trials of CV. DataBricks displays a message that says so; that's a feature in Spark's MLlib I think.

Last updated