Spark Pipelines: Elegant Yet Powerful

Eric Xu is a Data Scientist, Rails Developer at Outbrain and participated in the Spark Lab workshop in New York.

Introduction

We've all suffered through the experience of reopening a machine learning project and trying to trace back our thought process. Often times it feels like a jungle where dozens of feature engineering steps are criss-crossed with a grab-bag of hand-tuned models. If we can't easily follow our own code, how can others?

That's why I was excited when I learned about Spark's Machine Learning (ML) Pipelines during the Insight Spark Lab. The Pipeline API, introduced in Spark 1.2, is a high-level API for MLlib. Inspired by the popular implementation in scikit-learn, the concept of Pipelines is to facilitate the creation, tuning, and inspection of practical ML workflows. In other words, it lets us focus more on solving a machine learning task, instead of wasting time spent on organizing code.

Why Use Pipelines?

Typically during the exploratory stages of a machine learning problem, we find ourselves iterating through dozens, if not hundreds, of features and model combinations. Our thinking process could resemble something like this:

Steps Components
We start out with three features as inputs to our model - Features A, B, C
- Model X
Add a new feature - Features A, B, C, D new
- Model X
Normalize an existing feature - Features A, B normalized, C, D
- Model X
Use a different set of model parameters - Features A, B normalized, C, D
- Model X parameters v2
Use the predictions from first model as features for a second model - Features A, B normalized
- Model X parameters v2
- Features C, D, predictions from Model X
- Model Y



Before long, our Jupyter notebook is filled with spaghetti code that takes up hundreds of cells. Trying to ensure that our training and test data go through the identical process is manageable, but also tends to be tedious and error prone.

A better solution is to wrap each combination of steps with a Pipeline. This gives us a declarative interface where it's easy to see the entire data extraction, transformation, and model training workflow.

A Spark Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage.

The code below demonstrates how multiple Transformers and Estimators can be bundled to create a complex workflow. At Insight Data Labs, I was provided with several data sources ranging from several hundred gigabytes to a terabyte. I decided on working with the Amazon reviews dataset. The raw data consists of restaurant reviews (String) and ratings (Integers), like so:

Review Rating
If you're a vegetarian, there are just some places you shouldn't go to. Sausage factory. Pig roast. Republican National Convention. And Applebee's. 1
Unless you're going for cocktails and appetizers, don't try to actually eat anything at this Applebee's. Try Chuck E. Cheese upstairs. They may have a cheese pizza option for vegetarians. They just might. 3
No utensils given unless we asked. Was I supposed to dunk my face into my appetizer like a dog? 1



During the feature engineering process, text features are extracted from the raw reviews using both the HashingTF and Word2Vec algorithms. The ratings data are binarized with a OneHotEncoder. The feature engineering results are then combined using the VectorAssembler, before being passed to a Logistic Regression model.

from pyspark.ml import Pipeline  
from pyspark.ml.feature import *  
from pyspark.ml.classification import LogisticRegression

# Configure pipeline stages
tok = Tokenizer(inputCol="review", outputCol="words")  
htf = HashingTF(inputCol="words", outputCol="tf", numFeatures=200)  
w2v = Word2Vec(inputCol="review", outputCol="w2v")  
ohe = OneHotEncoder(inputCol="rating", outputCol="rc")  
va = VectorAssembler(inputCols=["tf", "w2v", "rc"], outputCol="features")  
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Build the pipeline
pipeline = Pipeline(stages=[tok, htf, w2v, ohe, va, lr])

# Fit the pipeline
model = pipeline.fit(train_df)

# Make a prediction
prediction = model.transform(test_df)  

This DAG diagram visualizes the structure of the Pipeline and all of its stages. Blue pentagons represent Transformers, the yellow diamond represents an Estimator, and the boxes represent DataFrames with the current column names flowing through the Pipeline.

Custom Transformers

The Spark community is quickly adding new feature transformers and algorithms for the Pipeline API with each version release. But what if we wanted to do something outside of the box like count the number of emojis in a block of text? It turns out to be not that difficult to extend the Transformer class and create our own custom transformers.

The basic rules to follow are that a Transformer needs to:
1. implement the transform method
2. specify an inputCol and outputCol
3. accept a DataFrame as input and return a DataFrame as output

The follow code snippet demonstrates a naive implementation of a word count Transformer.

from pyspark.ml.util import keyword_only  
from pyspark.ml.pipeline import Transformer  
from pyspark.ml.param.shared import HasInputCol, HasOutputCol

# Create a custom word count transformer class
class MyWordCounter(Transformer, HasInputCol, HasOutputCol):  
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(WordCounter, self).__init__()
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]

        # Define transformer logic
        def f(s):
            return len(s.split(' '))
        t = LongType()

        return dataset.withColumn(out_col, udf(f, t)(in_col))

# Instantiate the new word count transformer
wc = MyWordCounter(inputCol="review", outputCol="wc")  

We can now treat MyWordCounter like any other Transformer and add it as a stage to our Pipeline.

Bottom Line

Pipelines are a simple and effective way to manage complex machine learning workflows without tearing our hair out. It's power stands out even more when we get to cross-validation for hyperparameter tuning. Overall, the Pipeline API is a major step in making machine learning scalable, easy, and enjoyable.

Already a data scientist or engineer?

Join us for a two-day advanced Apache Spark Lab led by tech industry experts.

Interested in transitioning to career in data engineering?

Learn more about the Insight Data Engineering Fellows Program in New York and Silicon Valley.

Be the first to get new Data Labs tutorials.