Full Worked Random Forest Classifier Example
Code Snippets for RandomForestClassifier - PySpark
The general plan is
Extract all string columns; to encode them afterwards with
StringIndexer
Extract all numerical columns to impute nulls; if the model complained while fitting.
Create a
Pipeline
for all the steps you want to do. e.g.StringIndexer
,Imputer
,OneHotEncoder
,StandardScaler
(though is Standardizing isn't needed in RandomForest),VectorAssembler
(to create the "features" column). Don't forget to leave out the target variable, which has to be in binary 0/1 form, or integer for multiclass, not strings.Fit the Pipeline and transform the dataframe.
Split the dataframe into training and testing datasets, using
randomSplit
. Don't forget to set theseed
argument inrandomsplit
such that you get the same results later on if you ran the same experiment again.Instantiate
RandomForestClassifier
model. At this point don't include the "features" column name in the arguments as thefeaturesCol
, we will add it to it later. For some reason it errored out when I included it while instantiating the model. Don't forget to set theseed
also here, for reproducibility.setFeaturesCol
on your instantiatedRandomForestClassifier
model.Fit model to training set.
Save the fitted model for later use if needed. (and how to load it afterwards)
Predict on holdout set. Save dataframe with those predictions just in case.
Instantiate the
BinaryClassificationEvaluator
or theMulticlassClassificationEvaluator
depending on what you have.Call the evaluator on your predictions column.
Create the Confusion Matrix, with a workaround
Create the Feature Importance plot, with a workaround.
The Imports
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import OneHotEncoder, StandardScaler, VectorAssembler, StringIndexer, Imputer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
Extract String and Numerical Columns
Other than printSchema()
, you can see the types of all the columns in a PySpark dataframe with the command df.dtypes
which will return tuples, of column name, and its type. We will use this next.
You can choose to compute whatever differences and calculations you want from your TimestampType and DateType columns, afterwhich you might want to drop them from your dataframe.
# dropping TimestampType and DateType columns from the dataframe
df= df\
.drop(*[col[0] for col in df.dtypes if col[1] in ('timestamp','date')])
Then you want to make sure the ID column(s) and the target variable are execluded from the predictor columns,
execlude_from_Xs= ["target_variable","id_columns"]
#the predictors, of all types
Xs= [item for item in df.columns if item not in execlude_from_Xs]
Now we're ready to have those StringType columns in one list, and all numerical predictors in another list. Make sure, you're selecting only the predictors in the following step!
to_encode= [col[0] for col in df.select(*Xs).dtypes if col[1]=='string']
numerical_cols= [col[0] for col in df.select(*Xs).dtypes if col[1]!='string']
☞ Note
You can count how many columns you have of each type with the following snippet,
from collections import Counter
Counter([col[1] for col in df.dtypes])
This will give you a great overview of what you got. For example, if you have a "decimal" type column, you might want to convert them back to "double". I know if you want to convert the dataframe to Pandas at any point, it will complain about the DecimalType columns as not efficient.
Creating a Pipeline to Do All Feature Engineering
☞ Note
I'm against imputing null values with median. A good data scientist should look deeper into the data and understand the domain in order to replace missing values, or find another way to handle them. Yet, I'm listing the Imputer here for a quick and dirty analysis if you must.
If you want to learn more about the argument and what they mean, read the official docs. I find 3.1 docs for PySpark are a lot easier to read.
pipe_stages= []
# all string(categorical) variables will be encoded into numbers, each category by frequency of label
sindexer= StringIndexer(inputCols= to_encode,
outputCols= ["indexed_{}".format(item) for item in to_encode],
handleInvalid='keep',
stringOrderType='frequencyDesc')
pipe_stages += [sindexer] # must add each step to the Pipeline
# # dummy numerized strings into a sparse vector. (I didn't need this step, so I left it out)
# ohe= OneHotEncoder(inputCols=["indexed_{}".format(item) for item in to_encode],
# outputCols= ["indexed_ohe_{}".format(item) for item in to_encode],
# handleInvalid='keep',
# dropLast=True)
# pipe_stages += [ohe]
# impute missing numerical values, with the median (though bad practice)
imp= Imputer(inputCols= numerical_cols,
outputCols=['imputed_{}'.format(item) for item in numerical_cols],
strategy= 'median')
pipe_stages += [imp]
# create the un-standardized features vector
assembler= VectorAssembler(inputCols= ["indexed_{}".format(item) for item in to_encode] + ['imputed_{}'.format(item) for item in numerical_cols],
outputCol= "feats",
handleInvalid="keep")
pipe_stages += [assembler]
# scale all features. Maybe you want to do this Before encoding the string columns?
ss= StandardScaler(inputCol="feats",
outputCol="features",
withMean= False,
withStd=True)
pipe_stages += [ss]
pipe= Pipeline(stages= pipe_stages)
Fit the Pipeline and Transform the Data
df= pipe.fit(df).transform(df)
Split the DataFrame to Train and Test
train, test= df.randomSplit([0.75, 0.25], seed=42)
# train size is 75% and test size is 25% of the data
Instantiate RandomForestClassifier
rfc= RandomForestClassifier(numTrees=70,
maxDepth=3,
labelCol='target',
seed=42)
Read the docs for the rest of the arguments.
Set Features Column on the Instantiated RF Classifier
rfc.setFeaturesCol("features")
Fit to Training Set
rfc_model= rfc.fit(train)
Save the Fitted Model
rfc_model.save("/FileStore/my_folder/fitted_models/RFC_Jul272021")
# this is a DataBricks workspace DBFS path
Loading the Fitted Model in a Later Notebook
from pyspark.ml.classification import RandomForestClassificationModel
rfc_model= RandomForestClassificationModel.load("/FileStore/my_folder/fitted_models/RFC_Jul272021")
Predict on Holdout Set
preds= rfc_model.transform(test)
Very important and awesome thing about Spark, the predictions are columns added to the original dataframe, so you don't lose anything, and you don't need to merge back to know who's prediction is this.
Save the Dataframe with Predictions, Just in Case
preds.write.mode("overwrite")\
.option("header","true").format("parquet").save("my_path")
Scoring (Evaluating) RFC Model
# Instantiate the evaluator
# similarly with MulticlassClassificationEvaluator
bce= BinaryClassificationEvaluator(rawPredictionCol= "rawPrediction",
labelCol="target",
metricName= "areaUnderROC")
# Apply
bce.evaluate(preds)
In PySpark, when predicting with a classifier, you'll get 3 columns:
predictionCol
,probabilityCol
andrawPredictionCol
.The first one is the 1/0 of your binary classification,
The second one is the equivalent of predict proba in Scikit-Learn i.e. the probabilities of predicting positive or negative class, with the same defaults 50-50 that is, if >0.5 it's class 1 and if <0.5 it's labeled class 0.
The last column is for Spark's internal use. It's the column you feed to predictor and evaluator etc.
If you really want to, you can change those default namings while instantiating the classifier. In our example, the RandomForestClassifier has arguments to change the names of those columns.
There are two metrics for BinaryClassificationEvaluator,
"areaUnderROC"
which is the default, and"areadUnderPR"
Creating the Confusion Matrix, with a Workaround
from pyspark.mllib.evaluation import MulticlassMetrics
preds_float= preds\
.select("prediction", "target")\
.withColumn("target", f.col("target").cast(FloatType()))\
.orderBy("prediction")
cm= MulticlassMetrics(preds_float.rdd.map(tuple))
#print(cm.confusionMatrix().toArray())
#show the confusion matrix as a pandas df for clearer presentation
pd.DataFrame(cm.confusionMatrix().toArray(),
columns= ["true positive", "true negative"],
index= ["predicted positive", "predicted negative"])
I chose to go fancy and show the Confusion Matrix as Pandas dataframe, but the print statement I commented out is enough to show the CM; toArray
is a method available only on pyspark.ml.linalg.Vector
column, and it converts this Vector to a Numpy array.
You can subsititute the words "positive" and "negative" with whatver your classes actually represent, for cleaner and easier to understand presentation.
Source There are a couple of resources, this is the one I got the heart of this code from. There is another I didn't follow because I doubted the features names are the correct order with respect to features importances; my dataframe has several hundred columns.
Plot Top 20 Most Important Features, According to RandomForestClassifier
def get_features_importance(dataset= df, model= rfc_model):
"""
Careful! this function is hard-coded inside for feature names.
I created earlier `to_encode` and `numerical_cols` lists for Xs names; I'm using that here.
"""
sparse= model.featureImportances
vals= sparse.values
idx= sparse.indices
feature_names= [(to_encode + numerical_cols)[i] for i in idx]
importances_df= pd.DataFrame(zip(feature_names, vals), columns=["feature", "value"])
return importances_df
# note: I created the "features" vector from combining `to_encode` and `numerical_cols` lists, in that order. Thus, I can retrieve it in the same manner.
rfc_importance_df= get_features_importance().sort_values(by='value',ascending=False)
rfc_importance_df.head(20)
plt.figure(figsize=(9,6))
plt.barh(rfc_importance_df['feature'][:20],
rfc_importance_df['value'][:20])
☞ Notes
To quickly convert all DecimalType columns to DoubleType columns in PySpark,
decimal_cols= [str(col[0]) for col in df.dtypes if col[1].startswith("decimal")]
df1= df
for c in decimal_cols:
df1= df1\
.withColumn(c, f.col(c).cast("double"))
Last updated
Was this helpful?