Integration Tests

This section describes integration tests that are part of the Sparkling Water project.

The tests are performed for both Sparkling Water backend types (internal and external). Please see Sparkling Water Backends for more information about the backends.

Testing Environments

  • Local - corresponds to setting Spark MASTER variable to one of local, or local[*], or local-cluster[_,_,_] (local-cluster mode is just for testing purposes) values
  • Standalone cluster - the MASTER variable points to existing standalone Spark cluster spark://...
  • YARN cluster - the MASTER variable contains yarn-client or yarn-cluster values

Testing Scenarios

  1. Initialize H2O on top of Spark by running H2OContext.getOrCreate(spark) and verifying that H2O was properly initialized.
  2. Load data with help from the H2O API from various data sources:
  • local disk
  • HDFS
  • S3N
  1. Convert from RDD[T] to H2OFrame.
  2. Convert from DataFrame to H2OFrame.
  3. Convert from H2OFrame to RDD.
  4. Convert from H2OFrame to DataFrame.
  5. Integrate with H2O Algorithms using RDD as algorithm input.
  6. Integrate with MLlib Algorithms using H2OFrame as algorithm input (KMeans).
  7. Integrate with MLlib pipelines.

Integration Tests Example

The following code reflects the use cases listed above. The code is executed in all testing environments (if applicable). Spark 2.0+ is required:

  • local
  • standalone cluster
  • YARN
  1. Initialize H2O:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)
import h2oContext.implicits._
  1. Load data:
  • From the local disk:

    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().getOrCreate()
    import org.apache.spark.h2o._
    val h2oContext = H2OContext.getOrCreate(spark)
    
    import java.io.File
    val df: H2OFrame = new H2OFrame(new File("examples/smalldata/airlines/allyears2k_headers.zip"))
    

    Note: The file must be present on all nodes. Specifically in the case of the Sparkling Water internal backend, this must be present on all nodes with Spark. In the case of the Sparkling Water external backend, this must be present on all nodes with H2O.

  • From HDFS:

    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().getOrCreate()
    import org.apache.spark.h2o._
    val h2oContext = H2OContext.getOrCreate(spark)
    
    val path = "hdfs://mr-0xd6.0xdata.loc/datasets/airlines_all.csv"
    val uri = new java.net.URI(path)
    val airlinesHF = new H2OFrame(uri)
    
  • From S3N:

    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().getOrCreate()
    import org.apache.spark.h2o._
    val h2oContext = H2OContext.getOrCreate(spark)
    
    val path = "s3n://h2o-airlines-unpacked/allyears2k.csv"
    val uri = new java.net.URI(path)
    val airlinesHF = new H2OFrame(uri)
    

    Note: Spark/H2O needs to know the AWS credentials specified in core-site.xml. The credentials are passed via HADOOP_CONF_DIR, which points to a configuration directory with core-site.xml.

  1. Convert from RDD[T] to H2OFrame:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)

val rdd = sc.parallelize(1 to 1000, 100).map( v => IntHolder(Some(v)))
val hf: H2OFrame = h2oContext.asH2OFrame(rdd)
  1. Convert from DataFrame to H2OFrame:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)

import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 1000, 100).map(v => IntHolder(Some(v))).toDF
val hf = h2oContext.asH2OFrame(df)
  1. Convert from H2OFrame to RDD[T]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)

val rdd = spark.sparkContext.parallelize(1 to 1000, 100).map(v => IntHolder(Some(v)))
val hf: H2OFrame = h2oContext.asH2OFrame(rdd)
val newRdd = h2oContext.asRDD[IntHolder](hf)
  1. Convert from H2OFrame to DataFrame:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)

import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 1000, 100).map(v => IntHolder(Some(v))).toDF
val hf = h2oContext.asH2OFrame(df)
val newRdd = h2oContext.asDataFrame(hf)
  1. Integrate with H2O Algorithms using RDD as algorithm input:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)
import h2oContext.implicits._
import org.apache.spark.examples.h2o._

val path = "examples/smalldata/prostate.csv"
val prostateText = spark.sparkContext.textFile(path)
val prostateRDD = prostateText.map(_.split(",")).map(row => ProstateParse(row))
import _root_.hex.tree.gbm.GBM
import _root_.hex.tree.gbm.GBMModel.GBMParameters
val train: H2OFrame = prostateRDD
val gbmParams = new GBMParameters()
gbmParams._train = train
gbmParams._response_column = "CAPSULE"
gbmParams._ntrees = 10
val gbmModel = new GBM(gbmParams).trainModel.get
  1. Integrate with MLlib algorithms:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)
import org.apache.spark.examples.h2o._

import java.io.File
val path = "examples/smalldata/prostate.csv"
val prostateHF = new H2OFrame(new File(path))
val prostateRDD = h2oContext.asRDD[Prostate](prostateHF)
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
val train = prostateRDD.map( v => Vectors.dense(v.CAPSULE.get*1.0, v.AGE.get*1.0, v.DPROS.get*1.0,v.DCAPS.get*1.0, v.GLEASON.get*1.0))
val clusters = KMeans.train(train, 5, 20)