Running Sparkling Water on Google Cloud Dataproc

This section describes how to run Sparkling Water on Google Cloud Dataproc.

Perform the following steps to start Sparkling Water H2OContext on Cloud Dataproc.

Note: Be sure to allocate enough resources to handle your data. As a minimum recommendation, you should allocate 4 times memory of the size of your data in H2O.

  1. Install the Google SDK.

  2. Install Daisy. The path for Daisy will be required in a later step.

  3. After Daisy is installed, download and save Google’s generate_custom_image.py. This custom image will be referenced in a later step.

  4. Copy and save the following customization script. This will be required in a later step.

#!/bin/bash

apt-get update
apt-get install -y python-dev python-pip jq

cd /usr/lib/
wget http://h2o-release.s3.amazonaws.com/sparkling-water/spark-2.1/3.32.0.1-2-2.1/sparkling-water-3.32.0.1-2-2.1.zip
unzip sparkling-water-3.32.0.1-2-2.1.zip

pip install pip==9.0.3
pip install requests==2.18.4
pip install tabulate
pip install future
pip install colorama
pip install scikit-learn==0.19.1
pip install https://h2o-release.s3.amazonaws.com/h2o/rel-zermelo/1/Python/h2o-3.32.0.1-py2.py3-none-any.whl
pip install --upgrade google-cloud-bigquery
pip install --upgrade google-cloud-storage

pip install h2o_pysparkling_2.1
  1. Create a Dataproc Custom Image as defined here. Note that this references the Daisy path, the custom image name, and the customization script from previous steps. The code will be similar to the following:

python generate_custom_image.py \
 --image-name <custom_image_name> \
 --dataproc-version <Cloud Dataproc version (example: "1.2.22")> \
 --customization-script <local path to your custom script> \
 --daisy-path <local path to the downloaded daisy binary> \
 --zone <Compute Engine zone to use for the location of the temporary VM> \
 --gcs-bucket <URI (gs://bucket-name) of a Cloud Storage bucket in your project> \
 [--no-smoke-test: <"true" or "false" (default: "false")>
  1. Copy and save the following initialization script into a GCS bucket that you have access to. This will be required when you create your cluster.

#!/bin/bash

METADATA_ROOT='http://metadata/computeMetadata/v1/instance/attributes'

CLUSTER_NAME=$(curl -f -s -H Metadata-Flavor:Google \
${METADATA_ROOT}/dataproc-cluster-name)

cat << EOF > /usr/local/bin/await_cluster_and_run_command.sh
#!/bin/bash
# Helper to get current cluster state.
function get_cluster_state() {
        echo \$(gcloud dataproc clusters describe ${CLUSTER_NAME} | \
        grep -A 1 "^status:" | grep "state:" | cut -d ':' -f 2)
}
# Helper which waits for RUNNING state before issuing the command.
function await_and_submit() {
        local cluster_state=\$(get_cluster_state)
        echo "Current cluster state: \${cluster_state}"
        while [[ "\${cluster_state}" != 'RUNNING' ]]; do
        echo "Sleeping to await cluster health..."
        sleep 5
        local cluster_state=\$(get_cluster_state)
        if [[ "\${cluster_state}" == 'ERROR' ]]; then
          echo "Giving up due to cluster state '\${cluster_state}'"
          exit 1
        fi
        done

        echo "Changing Spark Configurations"
        sudo sed -i 's/spark.dynamicAllocation.enabled true/spark.dynamicAllocation.enabled false/g' /usr/lib/spark/conf/spark-defaults.conf
        sudo sed -i 's/spark.executor.instances 10000/# spark.executor.instances 10000/g' /usr/lib/spark/conf/spark-defaults.conf
        sudo sed -i 's/spark.executor.cores.*/# removing unnecessary limits to executor cores/g' /usr/lib/spark/conf/spark-defaults.conf
        sudo sed -i 's/^spark.executor.memory.*/# removing unnecessary limits to executor memory/g' /usr/lib/spark/conf/spark-defaults.conf
        sudo echo "spark.executor.instances $(gcloud dataproc clusters describe ${CLUSTER_NAME} | grep "numInstances:" | tail -1 | sed "s/.*numInstances: //g")" >> /usr/lib/spark/conf/spark-defaults.conf
        sudo echo "spark.executor.cores $(gcloud compute machine-types describe $(gcloud dataproc clusters describe ${CLUSTER_NAME} | grep "machineTypeUri" | tail -1 | sed 's/.*machineTypeUri: //g') | grep "guestCpus" | sed 's/guestCpus: //g')" >> /usr/lib/spark/conf/spark-defaults.conf
        sudo echo "spark.executor.memory $(($(gcloud compute machine-types describe $(gcloud dataproc clusters describe h2o-dataproc | grep "machineTypeUri" | tail -1 | sed 's/.*machineTypeUri: //g') | grep "memoryMb:" | sed 's/memoryMb: //g') * 65 / 100))m" >> /usr/lib/spark/conf/spark-defaults.conf
        echo "Successfully Changed spark-defaults.conf"

        cat /usr/lib/spark/conf/spark-defaults.conf
}

await_and_submit
EOF

chmod 750 /usr/local/bin/await_cluster_and_run_command.sh
nohup /usr/local/bin/await_cluster_and_run_command.sh &>> \
/var/log/master-post-init.log &
  1. After the image is created and the script is saved, create the cluster as defined here using the script created above. The only required flags are image and --initialization-actions.

gcloud dataproc clusters create sparklingwaterdataproc \
 --image=<myswdataprocimage> \
 --initialization-actions=gs://<bucket>/<initialization_script.sh>

Upon successful completion, you will have a Dataproc running Sparkling Water. You can run jobs now, for example:

gcloud dataproc jobs submit pyspark \
  --cluster cluster-name --region region \
  sample-script.py

Note: Dataproc does not automatically enable Spark logs. Refer to the following Stackoverflow answers:

Sample Script for Sparkling Water Job

Below is a sample script for running a Sparkling Water job. Edit the arguments to match your bucket and GCP setup.

import h2o
from h2o.automl import H2OAutoML
from pyspark.sql import SparkSession
from pysparkling import *

spark = SparkSession.builder.appName("SparklingWaterApp").getOrCreate()
hc = H2OContext.getOrCreate()

bucket = "h2o-bq-large-dataset"
train_path = "demos/cc_train.csv"
test_path = "demos/cc_test.csv"
y = "DEFAULT_PAYMENT_NEXT_MONTH"
is_classification = True

drop_cols = []
aml_args = {"max_runtime_secs": 120}

train_data = spark.read\
                  .options(header='true', inferSchema='true')\
                  .csv("gs://{}/{}".format(bucket, train_path))
test_data = spark.read\
                 .options(header='true', inferSchema='true')\
                 .csv("gs://{}/{}".format(bucket, test_path))

print("CREATING H2O FRAME")
training_frame = hc.asH2OFrame(train_data)
test_frame = hc.asH2OFrame(test_data)

x = training_frame.columns
x.remove(y)

for col in drop_cols:
    x.remove(col)

if is_classification:
    training_frame[y] = training_frame[y].asfactor()
else:
    print("REGRESSION: Not setting target column as factor")

print("TRAINING H2OAUTOML")
aml = H2OAutoML(**aml_args)
aml.train(x=x, y=y, training_frame=training_frame)

print(aml.leaderboard)

print('SUCCESS')