Running Sparkling Water on Google Cloud Dataproc

This section describes how to run Sparkling Water on Google Cloud Dataproc. It is meant to get you up and running with Sparkling Water on Google Cloud Dataproc as fast as possible so you can try it out. For further usage and productionizing some adjustments like initialization actions will be required. In this tutorial we will use Dataproc image version 2.0-debian10 which has Spark 3.1 and Scala 2.12.

  1. Install the Google Cloud SDK. Login to your account.

  2. Download Sparkling water.

  3. Create a Google Cloud Dataproc cluster.

DATAPROC_CLUSTER_NAME='sparkling-water-test'
GCP_REGION='europe-central2'

gcloud dataproc clusters create $DATAPROC_CLUSTER_NAME \
--region $GCP_REGION \
--image-version 2.0-debian10 \
--num-workers 3 \
--properties='^#^dataproc:pip.packages=tabulate==0.8.3,requests==2.21.0'
  • Scala
  • Python

Set the variables.

# we need spark jars to compile the example
SPARK_JARS=$(echo "$SPARK_HOME"/jars/*.jar | tr ' ' ':')
SPARKLING_WATER_JAR='sparkling-water-assembly_2.12-3.44.0.3-1-2.4-all.jar'

Copy the example job source into a file named SparklingWaterGcpExampleJob.scala

import java.net.URI
import ai.h2o.sparkling._
import org.apache.spark.SparkFiles
import org.apache.spark.sql.SparkSession

object SparklingWaterGcpExampleJob extends App {
  // start the cluster
  val spark = SparkSession.builder.appName("Sparkling water example").getOrCreate()
  import spark.implicits._
  val hc = H2OContext.getOrCreate()

  val expectedClusterSize = 3
  val clusterSize = hc.getH2ONodes().length
  require(clusterSize != expectedClusterSize, s"H2O cluster should be of size $expectedClusterSize but is $clusterSize")

  // prepare the data
  spark.sparkContext.addFile("https://raw.githubusercontent.com/h2oai/sparkling-water/master/examples/smalldata/prostate/prostate.csv")
  val frame = H2OFrame(new URI("file://" + SparkFiles.get("prostate.csv")))
  val sparkDF = hc.asSparkFrame(frame).withColumn("CAPSULE", $"CAPSULE" cast "string")
  val Array(trainingDF, testingDF) = sparkDF.randomSplit(Array(0.8, 0.2))

  // train the model
  import ai.h2o.sparkling.ml.algos.H2OGBM
  val estimator = new H2OGBM().setLabelCol("CAPSULE")
  val model = estimator.fit(trainingDF)

  // run predictions
  model.transform(testingDF).collect()
}

Compile the code into a jar file having Spark and Sparkling Water on the classpath.

EXAMPLE_JOB_JAR='sparkling-water-gcp-example-job.jar'
scalac SparklingWaterGcpExampleJob.scala \
      -d $EXAMPLE_JOB_JAR \
      -classpath "$SPARKLING_WATER_JAR:$SPARK_JARS"

Submit the job to the cluster.

gcloud dataproc jobs submit spark \
--class=SparklingWaterGcpExampleJob \
--cluster=$DATAPROC_CLUSTER_NAME \
--region=$GCP_REGION \
--jars "$SPARKLING_WATER_JAR,$EXAMPLE_JOB_JAR" \
--properties=spark.dynamicAllocation.enabled=false,spark.scheduler.minRegisteredResourcesRatio=1,spark.executor.instances=3