Autoencoder in Sparkling Water

Autoencoder in Sparkling Water is based on H2O-3’s Deep Learning algorithm and can be used for encoding an arbitrary list of features to the vector numerical values and for anomaly detection. Sparkling Water provides API for Autoencoder in Scala and Python. The following sections describe how to train and use the Autoencoder model in Sparkling Water in both languages. See also Parameters of H2OAutoEncoder.

Prepare Sparkling Water Environment

Scala

First, let’s start Sparkling Shell as

./bin/sparkling-shell

Start H2O cluster inside the Spark environment

import ai.h2o.sparkling._
import java.net.URI
val hc = H2OContext.getOrCreate()

Parse the data using H2O and convert them to Spark Frame

    import org.apache.spark.SparkFiles
spark.sparkContext.addFile("https://raw.githubusercontent.com/h2oai/sparkling-water/master/examples/smalldata/prostate/prostate.csv")
    val rawSparkDF = spark.read.option("header", "true").option("inferSchema", "true").csv(SparkFiles.get("prostate.csv"))
val sparkDF = rawSparkDF.withColumn("CAPSULE", $"CAPSULE" cast "string")
val Array(trainingDF, testingDF) = sparkDF.randomSplit(Array(0.8, 0.2))

Python

First, let’s start PySparkling Shell as

./bin/pysparkling

Start H2O cluster inside the Spark environment

from pysparkling import *
hc = H2OContext.getOrCreate()

Parse the data using H2O and convert them to Spark Frame

import h2o
frame = h2o.import_file("https://raw.githubusercontent.com/h2oai/sparkling-water/master/examples/smalldata/prostate/prostate.csv")
sparkDF = hc.asSparkFrame(frame)
sparkDF = sparkDF.withColumn("CAPSULE", sparkDF.CAPSULE.cast("string"))
[trainingDF, testingDF] = sparkDF.randomSplit([0.8, 0.2])

Use Case 1: Feature Encoding

H2OAutoEncoder is implemented as a Spark feature estimator, and thus can be used as a stage in a Spark pipeline.

Scala

Create H2OAutoEncoder` and set input columns, the neural network architecture and other parameters (see Parameters of H2OAutoEncoder).

import ai.h2o.sparkling.ml.features.H2OAutoEncoder
val autoEncoder = new H2OAutoEncoder()
autoEncoder.setInputCols(Array("AGE", "RACE", "DPROS", "DCAPS"))
autoEncoder.setHidden(Array(100))

Define other pipeline stages.

import ai.h2o.sparkling.ml.algos.H2OGBM
val gbm = new H2OGBM()
gbm.setFeaturesCol(autoEncoder.getOutputCol())
gbm.setLabelCol("CAPSULE")

Construct and fit the pipeline.

import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(autoEncoder, gbm))
val model = pipeline.fit(trainingDF)

Now, you can score with the pipeline model.

val resultDF = model.transform(testingDF)
resultDF.show(truncate=false)

Python

Create H2OAutoEncoder` and set input columns, the neural network architecture and other parameters (see Parameters of H2OAutoEncoder).

from pysparkling.ml import H2OAutoEncoder
autoEncoder = H2OAutoEncoder()
autoEncoder.setInputCols(["AGE", "RACE", "DPROS", "DCAPS"])
autoEncoder.setHidden([100,])

Define other pipeline stages.

from pysparkling.ml import H2OGBM
gbm = H2OGBM()
gbm.setFeaturesCols([autoEncoder.getOutputCol()])
gbm.setLabelCol("CAPSULE")

Construct and fit the pipeline.

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = [autoEncoder, gbm])
model = pipeline.fit(trainingDF)

Now, you can score with the pipeline model.

resultDF = model.transform(testingDF)
resultDF.show(truncate=False)

Use Case 2: Anomaly Detection

To use H2OAutoEncoder for the anomaly detection problem, H2OAutoEncoder or its MOJO model must be configured to produce a column with mean square errors (“MSE”). The errors are calculated from the output column and the original column, which represents a numerical input to the neural network of H2OAutoEncoder.

Scala

Create H2OAutoEncoder, enable MSE column and optionally the original column

import ai.h2o.sparkling.ml.features.H2OAutoEncoder
val autoEncoder = new H2OAutoEncoder()
autoEncoder.setInputCols(Array("RACE", "DPROS", "DCAPS"))
autoEncoder.setOutputCol("Output")
autoEncoder.setWithOriginalCol(true)
autoEncoder.setOriginalCol("Original")
autoEncoder.setWithMSECol(true)
autoEncoder.setMSECol("MSE")
autoEncoder.setHidden(Array(3))
autoEncoder.setSplitRatio(0.8)

Train the auto encoder model.

val model = autoEncoder.fit(trainingDF)

Specify MSE threshold, score with the trained model and identify outliers

val threshold = 0.1
val scoredDF = model.transform(testingDF)
import org.apache.spark.sql.functions.col
val outliersDF = scoredDF.filter(col("MSE") > threshold)
outliersDF.show(truncate=false)

The overall performance of the auto encoder model can be checked by seeing training and validation metrics (MSE, RMSE). The validation metrics are available only if a validation data frame or split ration is set.

println(model.getTrainingMetrics())
println(model.getValidationMetrics())

The same thing can be achieved with an auto encoder MOJO model loaded from a file, but the MSE column (and the original column) needs to be explicitly enabled.

import ai.h2o.sparkling.ml.models.H2OAutoEncoderMOJOModel
val model = H2OAutoEncoderMOJOModel.createFromMojo("path/to/auto_encoder_model.mojo")
model.setOutputCol("Output")
model.setWithOriginalCol(true)
model.setOriginalCol("Original")
model.setWithMSECol(true)
model.setMSECol("MSE")

Python

Create H2OAutoEncoder, enable MSE column and optionally the original column

from pysparkling.ml import H2OAutoEncoder
autoEncoder = H2OAutoEncoder()
autoEncoder.setInputCols(["RACE", "DPROS", "DCAPS"])
autoEncoder.setOutputCol("Output")
autoEncoder.setWithOriginalCol(True)
autoEncoder.setOriginalCol("Original")
autoEncoder.setWithMSECol(True)
autoEncoder.setMSECol("MSE")
autoEncoder.setHidden([3,])
autoEncoder.setSplitRatio(0.8)

Train the auto encoder model.

model = autoEncoder.fit(trainingDF)

Specify MSE threshold, score with the trained model and identify outliers.

threshold = 0.1
scoredDF = model.transform(testingDF)
from pyspark.sql.functions import col
outliersDF = scoredDF.filter(col("MSE") > threshold)
outliersDF.show(truncate=False)

The overall performance of the auto encoder model can be checked by seeing training and validation metrics (MSE, RMSE). The validation metrics are available only if a validation data frame or split ration is set.

print(model.getTrainingMetrics())
print(model.getValidationMetrics())

The same thing can be achieved with an auto encoder MOJO model loaded from a file, but the MSE column (and the original column) needs to be explicitly enabled.

from pysparkling.ml import H2OAutoEncoderMOJOModel
model = H2OAutoEncoderMOJOModel.createFromMojo("path/to/auto_encoder_model.mojo")
model.setOutputCol("Output")
model.setWithOriginalCol(True)
model.setOriginalCol("Original")
model.setWithMSECol(True)
model.setMSECol("MSE")