📈
Spark for Data Scientists
  • Introduction
  • Spark Scala
    • Spark Scala
      • Reading Files, Essential Imports & Docs + Optimization
      • Accessing Variables in Data Frames
      • Operations on One Column
      • Operations on Multiple Columns at Once
      • Filtering & Nulls
      • Window
      • DataFrame Manipulations
      • Aggregations
      • Array Columns
      • Spark Scala Fundamentals
      • User Defined Functions - UDF
      • Writing and Reading a Text File
      • Schema: Extracting, Reading, Writing to a Text File
      • Creating a Data Frame
      • Estimating, Partitioning, Writing/Saving a DataFrame
      • Machine Learning in Spark
      • Catch-It-All Page
  • PySpark
    • PySpark
      • Essential Imports & General Notes
      • Creating a Data Frame
      • UDFs
      • Operations on Multiple Columns at Once
      • Correlations in PySpark & Selecting Variables Based on That Correlation Threshold
      • Merging and Cleanup Duplicate Columns
      • Machine Learning with PySpark
      • Full Worked Random Forest Classifier Example
      • Related to ML
        • Modeling in PySpark as a Function for Faster Testing
        • Saving and Loading a Model with, and without MLFlow
        • Pipeline in PySpark 3.0.1, By Example
        • CountVectorizer to one-hot encode multiple columns at once
        • Cross Validation in Spark
        • Create Categories/Buckets Manually, and KS test
        • Data Frame Partitioning: Exhaustive and Mutually Exclusive Partition
      • Collection of Notes. Catch-It-All Page
      • Related Python Snippets
      • Appendix - Plotting in Python
  • Zeppelin Notebooks
    • Zeppelin Notebooks
    • DataBricks Useful Commands
Powered by GitBook
On this page

Was this helpful?

  1. PySpark
  2. PySpark
  3. Related to ML

Cross Validation in Spark

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#global variables
RANDOM_SEED_VALUE= 42
TARGET_VARIABLE= 'target_column'
ID_COLUMN= 'id_column'

# import, clean, prepare your data here
# don't forget you need to do the exact same thing to the data you will predict on!

feats=[item for item in clean_df.columns if item not in [TARGET_VARIABLE, ID_COLUMN]] # can be a tuple of (TARGET_VARIABLE, ID_COLUMN) as well

assembler= VectorAssembler(inputCols=feats, outputCol="features", handleInvalid='keep')

df= assembler.transform(clean_df)

# train/test split
reg_train, reg_test= df.randomSplit([.67, .33], seed= RAMDOM_SEED_VALUE)

# instantiating the model
gbtr= GBTRegressor(labelCol=TARGET_VARIABLE, 
predictionCol= "prediction_gbtr", seed=RANDOM_SEED_VALUE)
gbtr.setFeaturesCol("features")

# create the parameter grid for cross validation
paramgrid= (ParamGridBuilder()
.addGrid(gbtr.maxDepth, [5,3])
.addGrid(gbtr.stepSize, [0.01, 0.5, 0.1])
.addGrid(gbtr.minInfoGain, [0.01, 0.1])
.build())
# the outer parantheses here only to make Python accept new line code parts, to make it easier to read

# create the regression evaluator for metrics
re= RegressionEvaluator(predictionCol='prediction_gbtr',
labelCol=TARGET_VARIABLE, 
metricName='rmse')

# cross validator
cv= CrossValidator(estimator=gbtr, 
estimatorParamMaps= paramgrid, 
evaluator= re, 
numFolds=5, 
seed= RANDOM_SEED_VALUE, 
parallelism= 64*2, 
collectSubModels=False)

cv_model= cv.fit(df)

Notes

  • Tune the parallelism parameter in CV however fits your cluster. I had 256GB memory and 64 cores per node on mine, so I gave two threads to each core.

  • MLFLow will automatically track the trials of CV. DataBricks displays a message that says so; that's a feature in Spark's MLlib I think.

PreviousCountVectorizer to one-hot encode multiple columns at onceNextCreate Categories/Buckets Manually, and KS test

Last updated 2 years ago

Was this helpful?