Logistic Regression in Spark ML

Dhiraj Rai
8 min readNov 2, 2018

The intent of this blog is to demonstrate binary classification in pySpark. The various steps involved in developing a classification model in pySpark are as follows:

1) Initialize a Spark session

2) Download and read the the dataset

3) Developing initial understanding about the data

4) Handling missing values

5) Scalerizing the features

6) Train test split

7) Imbalance handling

8) Feature selection

9) Performance evaluation

Highlights

  1. Imbalance handling using class weights
  2. Hyper-parameter tuning using ParamGrid
  3. K fold cross validation

Please find complete ipynb file here

# Initializing a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("diabeties").config("spark.some.config.option","some-value").getOrCreate()

Download and read the dataset

For the purpose of demonstration I am using a dataset from using data from Pima Indians Diabetes Database. The dataset can be easily downloaded from this link https://www.kaggle.com/abhikaggle8/pima-diabetes-classification/data

# Lets us begin by reading the “diabetes.csv” and create a Spark dataframe named ‘raw_data’

raw_data = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r"diabetes.csv")raw_data.columns

This gives us a list of columns

['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI','DiabetesPedigreeFunction', 'Age', 'Outcome']

So, the data contains 9 columns. Each of these are described as follows:

The datasets consist of several medical predictor (independent) variables and one target (dependent) variable, Outcome. Independent variables include the number of pregnancies the patient has had, their BMI, insulin level, age, and so on.

Columns

1) Pregnancies: Number of times pregnant (numeric)

2) Glucose: Plasma glucose concentration a 2 hours in an oral glucose tolerance test (numeric)

3) Blood Pressure: Diastolic blood pressure (mm Hg) (numeric)

4) Skin Thickness: Triceps skin fold thickness (mm) (numeric)

5) Insulin: 2-Hour serum insulin (mu U/ml) (numeric)

6) BMI: Body mass index (weight in kg/(height in m)²) (numeric)

7) Diabetes Pedigree Function: Diabetes pedigree function (numeric)

8) Age: Age of the person (years)

9) Outcome: Class variable (0 or 1)

The ask is to buid a machine learning model to accurately predict whether or not the patients in the dataset have diabetes?

raw_data.describe().select("Summary","Pregnancies","Glucose","BloodPressure").show()

This gives us a summary of the considered fields which includes count, mean, stddev, minimum and maximum values as below:

+-------+------------------+-----------------+------------------+
|Summary| Pregnancies| Glucose| BloodPressure|
+-------+------------------+-----------------+------------------+
| count| 768| 768| 768|
| mean|3.8450520833333335| 120.89453125| 69.10546875|
| stddev| 3.36957806269887|31.97261819513622|19.355807170644777|
| min| 0| 0| 0|
| max| 17| 199| 122|
+-------+------------------+-----------------+------------------+
raw_data.describe().select("Summary","SkinThickness","Insulin").show()+-------+------------------+------------------+
|Summary| SkinThickness| Insulin|
+-------+------------------+------------------+
| count| 768| 768|
| mean|20.536458333333332| 79.79947916666667|
| stddev|15.952217567727642|115.24400235133803|
| min| 0| 0|
| max| 99| 846|
+-------+------------------+------------------+
raw_data.describe().select("Summary","BMI","DiabetesPedigreeFunction","Age").show()+-------+------------------+------------------------+---------------|Summary|BMI|DiabetesPedigreeFunction| Age|
+-------+------------------+------------------------+---------------
| count| 768 | 768 | 768|
| mean|31.99 | 0.4718 | 33.240 |
| stddev| 7.8841|0.33132 | 11.7602|
| min| 0.0 |0.078 | 21|
| max| 67.1|2.42 | 81|
+-------+------------------+------------------------+---------------

Looking at the above tables, it is observed that the minimum value for the fields such as “Pregnancies”, “glucose”, “blood pressure”, “skin thickness”,”insulin” and “BMI” are zero (0) which seems impractical to me (except “Pregnancies”).

Therefore, let us replace all the zeros in the abaove mentioned fields (except “Pregnancies”) with NaN.

import numpy as np
from pyspark.sql.functions import when
raw_data=raw_data.withColumn("Glucose",when(raw_data.Glucose==0,np.nan).otherwise(raw_data.Glucose))raw_data=raw_data.withColumn("BloodPressure",when(raw_data.BloodPressure==0,np.nan).otherwise(raw_data.BloodPressure))raw_data=raw_data.withColumn("SkinThickness",when(raw_data.SkinThickness==0,np.nan).otherwise(raw_data.SkinThickness))raw_data=raw_data.withColumn("BMI",when(raw_data.BMI==0,np.nan).otherwise(raw_data.BMI))raw_data=raw_data.withColumn("Insulin",when(raw_data.Insulin==0,np.nan).otherwise(raw_data.Insulin))raw_data.select("Insulin","Glucose","BloodPressure","SkinThickness","BMI").show(5)

So, it can be observed that all the zeros are repalced by NaN

+-------+-------+-------------+-------------+----+
|Insulin|Glucose|BloodPressure|SkinThickness| BMI|
+-------+-------+-------------+-------------+----+
| NaN| 148.0| 72.0| 35.0|33.6|
| NaN| 85.0| 66.0| 29.0|26.6|
| NaN| 183.0| 64.0| NaN|23.3|
| 94.0| 89.0| 66.0| 23.0|28.1|
| 168.0| 137.0| 40.0| 35.0|43.1|
+-------+-------+-------------+-------------+----+
only showing top 5 rows

Now, we can simply impute the NaN by calling an imputer :)

from pyspark.ml.feature import Imputer
imputer=Imputer(inputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"],outputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"])
model=imputer.fit(raw_data)
raw_data=model.transform(raw_data)
raw_data.show(5)

In addition, If we see the “Pregnancies” column in raw_data it can be seen that the maximum count goes upto 17, which is quit unbelievable. This may be an outlier, but we will discuss on outlier detection and removal in some other time.

Now, let us combine all the features in one single feature vector.

cols=raw_data.columns
cols.remove("Outcome")
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
raw_data=assembler.transform(raw_data)
raw_data.select("features").show(truncate=False)

A single row of feature vector looks as follows:

[6.0,148.0,72.0,35.0,155.5482233502538,33.6,0.627,50.0]

In addition, If we see the “Pregnancies” column in raw_data it can be seen that the maximum count goes upto 17, which is quit unbelievable. This may be an outlier, but we will discuss on outlier detection and removal in some other time.

Standard Sclarizer

So we have created a feature vector. Now let us use StandardScaler to scalerize the newly created “feature” column

from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
raw_data=standardscaler.fit(raw_data).transform(raw_data)
raw_data.select("features","Scaled_features").show(5)

`

+--------------------+--------------------+
| features| Scaled_features|
+--------------------+--------------------+
|[6.0,148.0,72.0,3...|[1.78063837321943...|
|[1.0,85.0,66.0,29...|[0.29677306220323...|
|[8.0,183.0,64.0,2...|[2.37418449762590...|
|[1.0,89.0,66.0,23...|[0.29677306220323...|
|[0.0,137.0,40.0,3...|[0.0,4.5012560836...|
+--------------------+--------------------+
only showing top 5 rows

Train, test split

Now that the preprocessing of the data is complete. Let us split the dataset in training and testing set.

train, test = raw_data.randomSplit([0.8, 0.2], seed=12345)

Let us check whether their is imbalance in the dataset

dataset_size=float(train.select("Outcome").count())
numPositives=train.select("Outcome").where('Outcome == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

The number of ones are 206
Percentage of ones are 34.2762063228

Imbalancing handling

Since the percentage of ones in the dataset is just 34.27 % surely their is imbalance in the dataset. Thankfully, in the case of logistic regression we have a technique called “Class Weighing”. I reccomend reading https://stackoverflow.com/questions/33372838/dealing-with-unbalanced-datasets-in-spark-mllib for the purpose of understanding.

In our dataset (train) we have 34.27 % positives and 65.73 % negatives. Since negatives are in a majority. Therefore,logistic loss objective function should treat the positive class (Outcome == 1) with higher weight. For this purpose we calculate the BalancingRatio as follows:

BalancingRatio= numNegatives/dataset_size

Then against every Outcome == 1, we put BalancingRatio in column “classWeights”, and against every Outcome == 0, we put 1-BalancingRatio in column “classWeights”

In this way, we assign higher weightage to the minority class (i.e. positive class)

BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

BalancingRatio = 0.657237936772

Now creating a new column named “classWeights” in the “train” dataset

train=train.withColumn("classWeights", when(train.Outcome == 1,BalancingRatio).otherwise(1-BalancingRatio))
train.select("classWeights").show(5)

Feature selection

We use the ChiSqSelector provided by Spark ML for selecting significant features. Please refer my previous blog for more details about working of the ChiSqSelector.

# Feature selection using chisquareSelector
from pyspark.ml.feature import ChiSqSelector
css = ChiSqSelector(featuresCol='Scaled_features',outputCol='Aspect',labelCol='Outcome',fpr=0.05)train=css.fit(train).transform(train)test=css.fit(test).transform(test)test.select("Aspect").show(5,truncate=False)

Building a classification model using Logistic Regression (LR)

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Outcome", featuresCol="Aspect",weightCol="classWeights",maxIter=10)
model=lr.fit(train)
predict_train=model.transform(train)
predict_test=model.transform(test)
predict_test.select("Outcome","prediction").show(10)

A sample of generated predictions is as follows

+-------+----------+
|Outcome|prediction|
+-------+----------+
| 0| 0.0|
| 0| 0.0|
| 0| 0.0|
| 0| 0.0|
| 0| 0.0|
| 1| 0.0|
| 0| 0.0|
| 0| 0.0|
| 0| 0.0|
| 0| 1.0|
+-------+----------+
only showing top 10 rows

Evaluating the model

Now let us evaluate the model using BinaryClassificationEvaluator class in Spark ML. BinaryClassificationEvaluator by default uses areaUnderROC as the performance metric. Read more about areaUnderROC here

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol=”rawPrediction”,labelCol=”Outcome”)
predict_test.select("Outcome","rawPrediction","prediction","probability").show(5)print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

The output is as follows:

+-------+--------------------+----------+--------------------+
|Outcome| rawPrediction|prediction| probability|
+-------+--------------------+----------+--------------------+
| 0|[2.58888023648551...| 0.0|[0.93014249279728...|
| 0|[2.32098145085482...| 0.0|[0.91059987057264...|
| 0|[1.68081620680194...| 0.0|[0.84301258010142...|
| 0|[0.64946166218389...| 0.0|[0.65688913922505...|
| 0|[1.78997774283908...| 0.0|[0.85692454770533...|
+-------+--------------------+----------+--------------------+
only showing top 5 rows
The area under ROC for train set is 0.838687476957
The area under ROC for test set is 0.844700460829

Hyper parameters

To this point we have developed a classification model using logistic regression. However, the working of logistic regression depends upon the on a number of parameters. As of now we have worked with only the default parameters. Now, let s try to tune the hyperparameters and see whether it make any difference.

List of tunable parameters in LR

1) aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)

2) elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)

3) family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)

4) featuresCol: features column name. (default: features, current: Aspect)

5) fitIntercept: whether to fit an intercept term. (default: True)

6) labelCol: label column name. (default: label, current: Outcome)

7) maxIter: max number of iterations (>= 0). (default: 100, current: 10)

8) predictionCol: prediction column name. (default: prediction)

9) probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)

10) rawPredictionCol: raw prediction (a.k.a. confidence) column name. (default: rawPrediction)

11) regParam: regularization parameter (>= 0). (default: 0.0)

12) standardization: whether to standardize the training features before fitting the model. (default: True)

13) threshold: Threshold in binary classification prediction, in range [0, 1].

14) If threshold and thresholds are both set, they must match.e.g. if threshold is p, then thresholds must be equal to [1-p, p]. (default: 0.5)

15) thresholds: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values > 0, excepting that at most one value may be 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class’s threshold. (undefined)

16) tol: the convergence tolerance for iterative algorithms (>= 0). (default: 1e-06)

17) weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (current: classWeights)

Now let us tune some of these parameters and observe their effect on the performance of the algorithm.

For the purpose of hyperparameter tuning we will consider the following parameters:

1) aggregationDepth [2, 5, 10]

2) elasticNetParam [0.0, 0.5, 1.0]

3) fitIntercept [True / False]

4) maxIter [10, 100, 1000]

5) regParam [0.01, 0.5, 2.0]

Let us define a parameter grid as follows:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidatorparamGrid = ParamGridBuilder()\
.addGrid(lr.aggregationDepth,[2,5,10])\
.addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
.addGrid(lr.fitIntercept,[False, True])\
.addGrid(lr.maxIter,[10, 100, 1000])\
.addGrid(lr.regParam,[0.01, 0.5, 2.0]) \
.build()

K-fold cross validation

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations
cvModel = cv.fit(train)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
predict_train=cvModel.transform(train)
predict_test=cvModel.transform(test)
print("The area under ROC for train set after CV is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set after CV is {}".format(evaluator.evaluate(predict_test)))

The output is as follows:

The area under ROC for train set after CV  is 0.843382081848
The area under ROC for test set after CV is 0.846697388633

For this problem we have not seen any significant improvement in the performance metric after tuning the hyperparameters. Looks like, the default parameters have worked well for this problem. However, hyperparameter tuning is an important aspect while solving problems with ML and must not be ignored.

Future work:

Few important things still remain, which I have not covered in this blog:

1) Outlier detection

2) Imbalance handling

The class weighing thechnique which we have used in this work is, currently, suitable only for logisticregression. However, in the case of other algorithms Random Forest, Naive Bayes, Support Vector Machine we may need to use techniques such as Synthetic Minority Oversampling Technique (SMOTE).

Useful links

https://spark.apache.org/docs/2.1.0/ml-tuning.html

https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html

https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

--

--