from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
#global variables
RANDOM_SEED_VALUE= 42
TARGET_VARIABLE= 'target_column'
ID_COLUMN= 'id_column'
# import, clean, prepare your data here
# don't forget you need to do the exact same thing to the data you will predict on!
feats=[item for item in clean_df.columns if item not in [TARGET_VARIABLE, ID_COLUMN]] # can be a tuple of (TARGET_VARIABLE, ID_COLUMN) as well
assembler= VectorAssembler(inputCols=feats, outputCol="features", handleInvalid='keep')
df= assembler.transform(clean_df)
# train/test split
reg_train, reg_test= df.randomSplit([.67, .33], seed= RAMDOM_SEED_VALUE)
# instantiating the model
gbtr= GBTRegressor(labelCol=TARGET_VARIABLE,
predictionCol= "prediction_gbtr", seed=RANDOM_SEED_VALUE)
gbtr.setFeaturesCol("features")
# create the parameter grid for cross validation
paramgrid= (ParamGridBuilder()
.addGrid(gbtr.maxDepth, [5,3])
.addGrid(gbtr.stepSize, [0.01, 0.5, 0.1])
.addGrid(gbtr.minInfoGain, [0.01, 0.1])
.build())
# the outer parantheses here only to make Python accept new line code parts, to make it easier to read
# create the regression evaluator for metrics
re= RegressionEvaluator(predictionCol='prediction_gbtr',
labelCol=TARGET_VARIABLE,
metricName='rmse')
# cross validator
cv= CrossValidator(estimator=gbtr,
estimatorParamMaps= paramgrid,
evaluator= re,
numFolds=5,
seed= RANDOM_SEED_VALUE,
parallelism= 64*2,
collectSubModels=False)
cv_model= cv.fit(df)
Notes
Tune the parallelism parameter in CV however fits your cluster. I had 256GB memory and 64 cores per node on mine, so I gave two threads to each core.
MLFLow will automatically track the trials of CV. DataBricks displays a message that says so; that's a feature in Spark's MLlib I think.