Feature Engineering in pyspark — Part I
The most commonly used data pre-processing techniques in approaches in Spark are as follows
1) VectorAssembler
2)Bucketing
3)Scaling and normalization
4) Working with categorical features
5) Text data transformers
6) Feature Manipulation
7) PCA
Please find the complete jupyter notebook here
This blog is mostly intended for who are new to Spark programming, specially pyspark. Therefore, this blog follows an illustrative approach to explain the commonly known data-preprocessing approaches.
But before we do anything, we need to initialize a Spark session. This can be done as follows
# Initializing a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option","some-value").getOrCreate()
Now let us download some sample datasets to play along with. The datasets I have used in the illustrations can be downloaded from here
For the purpose of demonstration we will be using three different datasets
1) retail-data/by-day
2) simple-ml-integers
3) simple-ml
4) simple-ml-scaling
Lets us begin by reading the “retail-data/by-day” which is in .csv format and save it into a Spark dataframe named ‘sales’
sales = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r"data/retail-data/by-day/*.csv").coalesce(5).where("Description IS NOT NULL")
Lets us read the parquet files in “simple-ml-integers” and make a Spark dataframe named ‘fakeIntDF’
fakeIntDF=spark.read.parquet("/home/spark/DhirajR/Spark/feature_engineering/data/simple-ml-integers")
Lets us read the parquet files in “simple-ml” and make a Spark dataframe named ‘simpleDF’
simpleDF=spark.read.json(r"/home/spark/DhirajR/Spark/feature_engineering/data/simple-ml")
Lets us read the parquet files in “simple-ml-scaling” and make a Spark dataframe named ‘scaleDF’
scaleDF=spark.read.parquet(r"/home/spark/DhirajR/Spark/feature_engineering/data/simple-ml-scaling")
Please find the complete jupyter notebook here
Vector assembler
The vector assembler is basically use to concatenate all the features into a single vector which can be further passed to the estimator or ML algorithm. In order to demo the ‘Vector Assembler’ we will use the ‘fakeIntDF’ which we had created in the previous steps.
Let us see what columns we have in ‘fakeIntDF’ dataframe
fakeIntDF.cache()
fakeIntDF.show()+----+----+----+
|int1|int2|int3|
+----+----+----+
| 7| 8| 9|
| 1| 2| 3|
| 4| 5| 6|
+----+----+----+
So we have three columns, int1, int2, int3 with integer values within them.
First, import the VectorAssembler from pyspark.ml.features. Once the Vector assembler is imported we are required to create the object of the same. Here I will create an object named “assembler”. The above result shows that we have three features in ‘fakeIntDF’ i.e. int1, int2, int3. Let us create the object assembler so as to combine the three features into a single column named features. At last, let us use the transform method to transform our dataset.
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["int1", "int2", "int3"],outputCol="features")
assembler.transform(fakeIntDF).show()
The output is as follows
+----+----+----+-------------+
|int1|int2|int3| features|
+----+----+----+-------------+
| 7| 8| 9|[7.0,8.0,9.0]|
| 1| 2| 3|[1.0,2.0,3.0]|
| 4| 5| 6|[4.0,5.0,6.0]|
+----+----+----+-------------+
The above results shows that a new column named “features” has been created which combined the “int1”, “int2”, “int3” into a single vector.
Please find the complete jupyter notebook here
Bucketing
Bucketing is a most straight forward approach for fro converting the continuous variables into categorical variable. In pyspark the task of bucketing can be easily accomplished using the Bucketizer class.
Firstly, we need to create bucket borders. Let us define a list bucketBorders =[-1.0, 5.0,10.0,250.0,600.0]
Next, let us create a object of the Bucketizer class. Then we will apply the transform method to our target Dataframe “dataframe”
Let us create a simple dataframe for demo purpose
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])
Then we can import the Bucketizer class, create an object of this class, and apply the transform method to the dataframe we just created.
from pyspark.ml.feature import Bucketizer
bucketBorders=[-float("inf"), -0.5, 0.0, 0.5, float("inf")]bucketer=Bucketizer().setSplits(bucketBorders).setInputCol("features").setOutputCol("Buckets")
bucketer.transform(dataFrame).show()
The output looks as follows:
+--------+-------+
|features|Buckets|
+--------+-------+
| -999.9| 0.0|
| -0.5| 1.0|
| -0.3| 1.0|
| 0.0| 2.0|
| 0.2| 2.0|
| 999.9| 3.0|
+--------+-------+
Scaling and normalization
Scaling and normalization is another common task that we come across while handling continuous variables. It is not always imperative but, highly recommended to scale and normalize the features before applying an ML algorithm in order to avert the risk of an algorithm being insensitive to a certain features.
Spark ML provides us with a class “StandardScaler” for easy scaling and normalization of features
Let us use the scaleDF dataframe that we had created previously to demonstrate the StandardScaler in pyspark.
scaleDF.show()+---+--------------+
| id| features|
+---+--------------+
| 0|[1.0,0.1,-1.0]|
| 1| [2.0,1.1,1.0]|
| 0|[1.0,0.1,-1.0]|
| 1| [2.0,1.1,1.0]|
| 1|[3.0,10.1,3.0]|
+---+--------------+from pyspark.ml.feature import StandardScaler
Scalerizer=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
Scalerizer.fit(scaleDF).transform(scaleDF).show()
The output looks as follows:
+---+--------------+------------------------------------------------
|id |features |Scaled_features|
+---+--------------+------------------------------------------------
|0 |[1.0,0.1,-1.0]|[1.195228,0.02337,-0.597614]|
|1 |[2.0,1.1,1.0] |[2.390457,0.25713,0.597614] |
|0 |[1.0,0.1,-1.0]|[1.19522,0.023376,-0.5976143046671968]|
|1 |[2.0,1.1,1.0] |[2.390457,0.257138,0.597614] |
|1 |[3.0,10.1,3.0]|[3.585682.3609991,1.792842] |
+---+--------------+------------------------------------------------
MinMaxScaler
The StandardScaler standardizes the features with a zero mean and standard deviation of 1. Sometimes, we come across situations where we need to scale values within a given range (i.e. max and min). For such task Spark ML provdies a class named MinMaxScaler.
The StandardScaler and MinMaxScaler share the common soul, the only difference is that we can provide the minimum value and maximum values within which we wish to scale the features.
For the sake of illustration, let us scale the features in the range 5 to 10.
from pyspark.ml.feature import MinMaxScaler
# Let us create an object of MinMaxScaler class
MinMaxScalerizer=MinMaxScaler().setMin(5).setMax(10).setInputCol("features").setOutputCol("MinMax_Scaled_features")
MinMaxScalerizer.fit(scaleDF).transform(scaleDF).show()
The output looks like this:
+---+--------------+----------------------+
| id| features|MinMax_Scaled_features|
+---+--------------+----------------------+
| 0|[1.0,0.1,-1.0]| [5.0,5.0,5.0]|
| 1| [2.0,1.1,1.0]| [7.5,5.5,7.5]|
| 0|[1.0,0.1,-1.0]| [5.0,5.0,5.0]|
| 1| [2.0,1.1,1.0]| [7.5,5.5,7.5]|
| 1|[3.0,10.1,3.0]| [10.0,10.0,10.0]|
+---+--------------+----------------------+
MinAbsScaler
Sometimes we need to scalerize features between -1 to 1. The MinAbsScaler does exactly this by dividing the features by the maximum absolute values.
from pyspark.ml.feature import MaxAbsScaler
# Let us create an object of MinAbsScaler class
MinAbsScalerizer=MaxAbsScaler().setInputCol(“features”).setOutputCol(“MinAbs_Scaled_features”)
MinAbsScalerizer.fit(scaleDF).transform(scaleDF).show(truncate =False)
The output looks like this:
+---+--------------+------------------------------------------------
|id |features |MinAbs_Scaled_features|
+---+--------------+------------------------------------------------
|0 |[1.0,0.1,-1.0]|[0.33333,0.009900,-0.33333]|
|1 |[2.0,1.1,1.0] |[0.66666,0.108910,0.333333]|
|0 |[1.0,0.1,-1.0]|[0.333333,0.00990,-0.33333]|
|1 |[2.0,1.1,1.0] |[0.666666,0.108910,0.33333]|
|1 |[3.0,10.1,3.0]|[1.0,1.0,1.0] |
+---+--------------+------------------------------------------------
So it is observed that all the features got scaled between -1 to 1.
ElementwiseProduct
What differentiates ElementwiseProduct from the previously mentioned scalizers is the fact that, in ElementwiseProduct the features are scaled based on a multiplying factor.
The below mentioned code snippet will transform the feature#1 → 10 times, feature#2 → 0.1 times and feature#3 → -1 times
For example → the features [10, 20, 30] if scaled by [10, 0.1, -1] will become [100, 2.0, -30]
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors# Let us define a scaling vectorScalebyVector=Vectors.dense([10,0.1,-1])# Let us create an object of the class Elementwise product
ScalingUp=ElementwiseProduct().setScalingVec(ScalebyVector).setInputCol("features").setOutputCol("ElementWiseProduct")
# Let us transform
ScalingUp.transform(scaleDF).show()
The output looks like this:
+---+--------------+-------------------------------+
|id |features |ElementWiseProduct |
+---+--------------+-------------------------------+
|0 |[1.0,0.1,-1.0]|[10.0,0.010,1.0] |
|1 |[2.0,1.1,1.0] |[20.0,0.110,-1.0] |
|0 |[1.0,0.1,-1.0]|[10.0,0.010,1.0] |
|1 |[2.0,1.1,1.0] |[20.0,0.110,-1.0] |
|1 |[3.0,10.1,3.0]|[30.0,1.01,-3.0] |
+---+--------------+-------------------------------+
So it is observed that all the features got scaled between -1 to 1.
Normalizer
The normalizer allows the user to calculate distance between features. The most commonly used distance metrics are “Manhattan distance” and the “Euclidean distance”. The Normalizer takes a parameter “p” from the user which represents the power norm.
For example, p = 1 indicates L1 norm; p = 2 indicates L2 norm.
In fact, p can also be infinity.
from pyspark.ml.feature import Normalizer
l1_norm=Normalizer().setP(1).setInputCol("features").setOutputCol("l1_norm")
l2_norm=Normalizer().setP(2).setInputCol("features").setOutputCol("l2_norm")
linf_norm=Normalizer().setP(float("inf")).setInputCol("features").setOutputCol("linf_norm")
# Let us transform
l1_norm.transform(scaleDF).show(truncate=False)
The output looks as follows:
+---+--------------+-----------------------------------------------+
|id |features |l1_norm |
+---+--------------+-----------------------------------------------+
|0 |[1.0,0.1,-1.0]|[0.47619,0.047619,-0.47619]|
|1 |[2.0,1.1,1.0] |[0.48780,0.26829,0.24390] |
|0 |[1.0,0.1,-1.0]|[0.47619,0.047619,-0.47619]|
|1 |[2.0,1.1,1.0] |[0.48780,0.26829,0.24390] |
|1 |[3.0,10.1,3.0]|[0.18633,0.62732,0.18633] |
+---+--------------+-----------------------------------------------+
Please find the complete jupyter notebook here
StringIndexer (Converting strings to numerical values)
Most of the ML algorithms require converting categorical features into numerical ones.
Sparks StringIndexer maps strings into different numerical values. We will use the simpleDF dataframe for demo purpose which consist of a feature “lab” which is a categorical variable.
simpleDF.show(5)+-----+----+------+------------------+
|color| lab|value1| value2|
+-----+----+------+------------------+
|green|good| 1|14.386294994851129|
| blue| bad| 8|14.386294994851129|
| blue| bad| 12|14.386294994851129|
|green|good| 15| 38.97187133755819|
|green|good| 12|14.386294994851129|
+-----+----+------+------------------+
only showing top 5 rows
Let us apply string indexer to a categorical variable named “lab” in “simpleDF” DataFrame.
from pyspark.ml.feature import StringIndexer
# Let us create an object of the class StringIndexer
lblindexer=StringIndexer().setInputCol("lab").setOutputCol("LabelIndexed")
# Let us transform
idxRes=lblindexer.fit(simpleDF).transform(simpleDF)
idxRes=idxRes.drop("value1","value2")
idxRes.show(5)
The output looks as follows. It can be seen that a new column is added to the dataframe, which contains encoded values for “good” and “bad” , like, 1.0 for good and 0.0 for bad.
+-----+----+------------+
|color| lab|LabelIndexed|
+-----+----+------------+
|green|good| 1.0|
| blue| bad| 0.0|
| blue| bad| 0.0|
|green|good| 1.0|
|green|good| 1.0|
+-----+----+------------+
only showing top 5 rows
IndexToString
Sometimes we come across situations where it is necessary to convert the indexed values back to text. To do this the Spark ML provides a class “IndextoString”. To demonstrate the “IndextoString” let us use the “LabelIndexed” column of “idxRes” dataframe which was created in the previous code snippet.
The LabelIndexed column consists of 1.0 → good and 0.0 → bad. Now let us try and reverse this
from pyspark.ml.feature import IndexToStringLabelReverse=IndexToString().setInputCol("LabelIndexed")\
.setOutputCol("ReverseIndex")LabelReverse.transform(idxRes).show(5)
The output looks as follows. It can be seen that a new column named “ReverseIndex” is now added to the dataframe “idxRes” which, obviously reverses the values in “LabelIndexed” column to text. E.g. 1.0 →good and 0.0 → bad
+-----+----+------------+------------+
|color| lab|LabelIndexed|ReverseIndex|
+-----+----+------------+------------+
|green|good| 1.0| good|
| blue| bad| 0.0| bad|
| blue| bad| 0.0| bad|
|green|good| 1.0| good|
|green|good| 1.0| good|
+-----+----+------------+------------+
only showing top 5 rows
Indexing within Vectors
Spark offers yet another class named “VectorIndexer”. The “VectorIndexer” identifies the categorical variables with a set of features which is already been vectorized and converts it into a categorical feature with zero based category indices.
For the purpose of illustration let us first create a new DataFrame named “dataln”with features in the form of Vectors.
from pyspark.ml.linalg import Vectors
dataln=spark.createDataFrame([(Vectors.dense(1,2,3),1),(Vectors.dense(2,5,6),2),(Vectors.dense(1,8,9),3)]).toDF("features","labels")
dataln.show()
The dataframe contains three rows and two columns. 1) Three features in the form of Vectors, here the first feature is a categorical variable 2) labels.
+-------------+------+
| features|labels|
+-------------+------+
|[1.0,2.0,3.0]| 1|
|[2.0,5.0,6.0]| 2|
|[1.0,8.0,9.0]| 3|
+-------------+------+
Now let us use the “VectorIndexer” to automatically identify the categorical feature and index it.
There are a few more feature engineering techniques, please refer my python notebook for the same at this github link.
Conclusion
In this humble blog I have tried to cover some basic but widely used data preprocessing transformations offered by Spark ML. I have demonstrated each of them with an illustration. However, there is a plethora of Spark tools to aid the feature engineering task some of these I will try to cover them in my next blog. Also, I will try to implement an end-to-end classification model in pyspark in my next blog.
Bibliography
Chambers, B., & Zaharia, M., 2018. Spark: The definitive guide. “ O’Reilly Media, Inc.”.