Autoencoder in Sparkling Water ============================== Autoencoder in Sparkling Water is based on H2O-3's Deep Learning algorithm and can be used for encoding an arbitrary list of features to the vector numerical values and for anomaly detection. Sparkling Water provides API for Autoencoder in Scala and Python. The following sections describe how to train and use the Autoencoder model in Sparkling Water in both languages. See also :ref:`parameters_H2OAutoEncoder` and :ref:`model_details_H2OAutoEncoderMOJOModel`. Prepare Sparkling Water Environment ----------------------------------- .. content-tabs:: .. tab-container:: Scala :title: Scala First, let's start Sparkling Shell as .. code:: shell ./bin/sparkling-shell Start H2O cluster inside the Spark environment .. code:: scala import ai.h2o.sparkling._ import java.net.URI val hc = H2OContext.getOrCreate() Parse the data using H2O and convert them to Spark Frame .. code:: scala import org.apache.spark.SparkFiles spark.sparkContext.addFile("https://raw.githubusercontent.com/h2oai/sparkling-water/master/examples/smalldata/prostate/prostate.csv") val rawSparkDF = spark.read.option("header", "true").option("inferSchema", "true").csv(SparkFiles.get("prostate.csv")) val sparkDF = rawSparkDF.withColumn("CAPSULE", $"CAPSULE" cast "string") val Array(trainingDF, testingDF) = sparkDF.randomSplit(Array(0.8, 0.2)) .. tab-container:: Python :title: Python First, let's start PySparkling Shell as .. code:: shell ./bin/pysparkling Start H2O cluster inside the Spark environment .. code:: python from pysparkling import * hc = H2OContext.getOrCreate() Parse the data using H2O and convert them to Spark Frame .. code:: python import h2o frame = h2o.import_file("https://raw.githubusercontent.com/h2oai/sparkling-water/master/examples/smalldata/prostate/prostate.csv") sparkDF = hc.asSparkFrame(frame) sparkDF = sparkDF.withColumn("CAPSULE", sparkDF.CAPSULE.cast("string")) [trainingDF, testingDF] = sparkDF.randomSplit([0.8, 0.2]) Use Case 1: Feature Encoding ---------------------------- ``H2OAutoEncoder`` is implemented as a Spark feature estimator, and thus can be used as a stage in a Spark pipeline. .. content-tabs:: .. tab-container:: Scala :title: Scala Create `H2OAutoEncoder`` and set input columns, the neural network architecture and other parameters (see :ref:`parameters_H2OAutoEncoder`). .. code:: scala import ai.h2o.sparkling.ml.features.H2OAutoEncoder val autoEncoder = new H2OAutoEncoder() autoEncoder.setInputCols(Array("AGE", "RACE", "DPROS", "DCAPS")) autoEncoder.setHidden(Array(100)) Define other pipeline stages. .. code:: scala import ai.h2o.sparkling.ml.algos.H2OGBM val gbm = new H2OGBM() gbm.setFeaturesCol(autoEncoder.getOutputCol()) gbm.setLabelCol("CAPSULE") Construct and fit the pipeline. .. code:: scala import org.apache.spark.ml.Pipeline val pipeline = new Pipeline().setStages(Array(autoEncoder, gbm)) val model = pipeline.fit(trainingDF) Now, you can score with the pipeline model. .. code:: scala val resultDF = model.transform(testingDF) resultDF.show(truncate=false) .. tab-container:: Python :title: Python Create `H2OAutoEncoder`` and set input columns, the neural network architecture and other parameters (see :ref:`parameters_H2OAutoEncoder`). .. code:: python from pysparkling.ml import H2OAutoEncoder autoEncoder = H2OAutoEncoder() autoEncoder.setInputCols(["AGE", "RACE", "DPROS", "DCAPS"]) autoEncoder.setHidden([100,]) Define other pipeline stages. .. code:: python from pysparkling.ml import H2OGBM gbm = H2OGBM() gbm.setFeaturesCols([autoEncoder.getOutputCol()]) gbm.setLabelCol("CAPSULE") Construct and fit the pipeline. .. code:: python from pyspark.ml import Pipeline pipeline = Pipeline(stages = [autoEncoder, gbm]) model = pipeline.fit(trainingDF) Now, you can score with the pipeline model. .. code:: python resultDF = model.transform(testingDF) resultDF.show(truncate=False) Use Case 2: Anomaly Detection ----------------------------- To use ``H2OAutoEncoder`` for the anomaly detection problem, ``H2OAutoEncoder`` or its MOJO model must be configured to produce a column with mean square errors ("MSE"). The errors are calculated from the output column and the original column, which represents a numerical input to the neural network of ``H2OAutoEncoder``. .. content-tabs:: .. tab-container:: Scala :title: Scala Create ``H2OAutoEncoder``, enable MSE column and optionally the original column .. code:: scala import ai.h2o.sparkling.ml.features.H2OAutoEncoder val autoEncoder = new H2OAutoEncoder() autoEncoder.setInputCols(Array("RACE", "DPROS", "DCAPS")) autoEncoder.setOutputCol("Output") autoEncoder.setWithOriginalCol(true) autoEncoder.setOriginalCol("Original") autoEncoder.setWithMSECol(true) autoEncoder.setMSECol("MSE") autoEncoder.setHidden(Array(3)) autoEncoder.setSplitRatio(0.8) Train the auto encoder model. .. code:: scala val model = autoEncoder.fit(trainingDF) Specify MSE threshold, score with the trained model and identify outliers .. code:: scala val threshold = 0.1 val scoredDF = model.transform(testingDF) import org.apache.spark.sql.functions.col val outliersDF = scoredDF.filter(col("MSE") > threshold) outliersDF.show(truncate=false) The overall performance of the auto encoder model can be checked by seeing training and validation metrics (MSE, RMSE). The validation metrics are available only if a validation data frame or split ration is set. .. code:: scala println(model.getTrainingMetrics()) println(model.getValidationMetrics()) The same thing can be achieved with an auto encoder MOJO model loaded from a file, but the MSE column (and the original column) needs to be explicitly enabled. .. code:: scala import ai.h2o.sparkling.ml.models.H2OAutoEncoderMOJOModel val model = H2OAutoEncoderMOJOModel.createFromMojo("path/to/auto_encoder_model.mojo") model.setOutputCol("Output") model.setWithOriginalCol(true) model.setOriginalCol("Original") model.setWithMSECol(true) model.setMSECol("MSE") .. tab-container:: Python :title: Python Create ``H2OAutoEncoder``, enable MSE column and optionally the original column .. code:: python from pysparkling.ml import H2OAutoEncoder autoEncoder = H2OAutoEncoder() autoEncoder.setInputCols(["RACE", "DPROS", "DCAPS"]) autoEncoder.setOutputCol("Output") autoEncoder.setWithOriginalCol(True) autoEncoder.setOriginalCol("Original") autoEncoder.setWithMSECol(True) autoEncoder.setMSECol("MSE") autoEncoder.setHidden([3,]) autoEncoder.setSplitRatio(0.8) Train the auto encoder model. .. code:: python model = autoEncoder.fit(trainingDF) Specify MSE threshold, score with the trained model and identify outliers. .. code:: python threshold = 0.1 scoredDF = model.transform(testingDF) from pyspark.sql.functions import col outliersDF = scoredDF.filter(col("MSE") > threshold) outliersDF.show(truncate=False) The overall performance of the auto encoder model can be checked by seeing training and validation metrics (MSE, RMSE). The validation metrics are available only if a validation data frame or split ration is set. .. code:: python print(model.getTrainingMetrics()) print(model.getValidationMetrics()) The same thing can be achieved with an auto encoder MOJO model loaded from a file, but the MSE column (and the original column) needs to be explicitly enabled. .. code:: python from pysparkling.ml import H2OAutoEncoderMOJOModel model = H2OAutoEncoderMOJOModel.createFromMojo("path/to/auto_encoder_model.mojo") model.setOutputCol("Output") model.setWithOriginalCol(True) model.setOriginalCol("Original") model.setWithMSECol(True) model.setMSECol("MSE")