Running Sparkling Water on Google Cloud Dataproc¶
This section describes how to run Sparkling Water on Google Cloud Dataproc.
Perform the following steps to start Sparkling Water H2OContext
on Cloud Dataproc.
Note: Be sure to allocate enough resources to handle your data. As a minimum recommendation, you should allocate 4 times memory of the size of your data in H2O.
Install the Google SDK.
Install Daisy. The path for Daisy will be required in a later step.
After Daisy is installed, download and save Google’s generate_custom_image.py. This custom image will be referenced in a later step.
Copy and save the following customization script. This will be required in a later step.
#!/bin/bash apt-get update apt-get install -y python-dev python-pip jq cd /usr/lib/ wget http://h2o-release.s3.amazonaws.com/sparkling-water/spark-3.0/3.34.0.4-1-3.0/sparkling-water-3.34.0.4-1-3.0.zip unzip sparkling-water-3.34.0.4-1-3.0.zip pip install pip==9.0.3 pip install requests==2.18.4 pip install tabulate pip install future pip install scikit-learn==0.19.1 pip install https://h2o-release.s3.amazonaws.com/h2o/rel-zizler/4/Python/h2o-3.34.0.4-py2.py3-none-any.whl pip install --upgrade google-cloud-bigquery pip install --upgrade google-cloud-storage pip install h2o_pysparkling_3.0
Create a Dataproc Custom Image as defined here. Note that this references the Daisy path, the custom image name, and the customization script from previous steps. The code will be similar to the following:
python generate_custom_image.py \ --image-name <custom_image_name> \ --dataproc-version <Cloud Dataproc version (example: "1.2.22")> \ --customization-script <local path to your custom script> \ --daisy-path <local path to the downloaded daisy binary> \ --zone <Compute Engine zone to use for the location of the temporary VM> \ --gcs-bucket <URI (gs://bucket-name) of a Cloud Storage bucket in your project> \ [--no-smoke-test: <"true" or "false" (default: "false")>
Copy and save the following initialization script into a GCS bucket that you have access to. This will be required when you create your cluster.
#!/bin/bash METADATA_ROOT='http://metadata/computeMetadata/v1/instance/attributes' CLUSTER_NAME=$(curl -f -s -H Metadata-Flavor:Google \ ${METADATA_ROOT}/dataproc-cluster-name) cat << EOF > /usr/local/bin/await_cluster_and_run_command.sh #!/bin/bash # Helper to get current cluster state. function get_cluster_state() { echo \$(gcloud dataproc clusters describe ${CLUSTER_NAME} | \ grep -A 1 "^status:" | grep "state:" | cut -d ':' -f 2) } # Helper which waits for RUNNING state before issuing the command. function await_and_submit() { local cluster_state=\$(get_cluster_state) echo "Current cluster state: \${cluster_state}" while [[ "\${cluster_state}" != 'RUNNING' ]]; do echo "Sleeping to await cluster health..." sleep 5 local cluster_state=\$(get_cluster_state) if [[ "\${cluster_state}" == 'ERROR' ]]; then echo "Giving up due to cluster state '\${cluster_state}'" exit 1 fi done echo "Changing Spark Configurations" sudo sed -i 's/spark.dynamicAllocation.enabled true/spark.dynamicAllocation.enabled false/g' /usr/lib/spark/conf/spark-defaults.conf sudo sed -i 's/spark.executor.instances 10000/# spark.executor.instances 10000/g' /usr/lib/spark/conf/spark-defaults.conf sudo sed -i 's/spark.executor.cores.*/# removing unnecessary limits to executor cores/g' /usr/lib/spark/conf/spark-defaults.conf sudo sed -i 's/^spark.executor.memory.*/# removing unnecessary limits to executor memory/g' /usr/lib/spark/conf/spark-defaults.conf sudo echo "spark.executor.instances $(gcloud dataproc clusters describe ${CLUSTER_NAME} | grep "numInstances:" | tail -1 | sed "s/.*numInstances: //g")" >> /usr/lib/spark/conf/spark-defaults.conf sudo echo "spark.executor.cores $(gcloud compute machine-types describe $(gcloud dataproc clusters describe ${CLUSTER_NAME} | grep "machineTypeUri" | tail -1 | sed 's/.*machineTypeUri: //g') | grep "guestCpus" | sed 's/guestCpus: //g')" >> /usr/lib/spark/conf/spark-defaults.conf sudo echo "spark.executor.memory $(($(gcloud compute machine-types describe $(gcloud dataproc clusters describe h2o-dataproc | grep "machineTypeUri" | tail -1 | sed 's/.*machineTypeUri: //g') | grep "memoryMb:" | sed 's/memoryMb: //g') * 65 / 100))m" >> /usr/lib/spark/conf/spark-defaults.conf echo "Successfully Changed spark-defaults.conf" cat /usr/lib/spark/conf/spark-defaults.conf } await_and_submit EOF chmod 750 /usr/local/bin/await_cluster_and_run_command.sh nohup /usr/local/bin/await_cluster_and_run_command.sh &>> \ /var/log/master-post-init.log &
After the image is created and the script is saved, create the cluster as defined here using the script created above. The only required flags are
image
and--initialization-actions
.
gcloud dataproc clusters create sparklingwaterdataproc \ --image=<myswdataprocimage> \ --initialization-actions=gs://<bucket>/<initialization_script.sh>
Upon successful completion, you will have a Dataproc running Sparkling Water. You can run jobs now, for example:
gcloud dataproc jobs submit pyspark \
--cluster cluster-name --region region \
sample-script.py
Note: Dataproc does not automatically enable Spark logs. Refer to the following Stackoverflow answers:
Sample Script for Sparkling Water Job¶
Below is a sample script for running a Sparkling Water job. Edit the arguments to match your bucket and GCP setup.
import h2o
from h2o.automl import H2OAutoML
from pyspark.sql import SparkSession
from pysparkling import *
spark = SparkSession.builder.appName("SparklingWaterApp").getOrCreate()
hc = H2OContext.getOrCreate()
bucket = "h2o-bq-large-dataset"
train_path = "demos/cc_train.csv"
test_path = "demos/cc_test.csv"
y = "DEFAULT_PAYMENT_NEXT_MONTH"
is_classification = True
drop_cols = []
aml_args = {"max_runtime_secs": 120}
train_data = spark.read\
.options(header='true', inferSchema='true')\
.csv("gs://{}/{}".format(bucket, train_path))
test_data = spark.read\
.options(header='true', inferSchema='true')\
.csv("gs://{}/{}".format(bucket, test_path))
print("CREATING H2O FRAME")
training_frame = hc.asH2OFrame(train_data)
test_frame = hc.asH2OFrame(test_data)
x = training_frame.columns
x.remove(y)
for col in drop_cols:
x.remove(col)
if is_classification:
training_frame[y] = training_frame[y].asfactor()
else:
print("REGRESSION: Not setting target column as factor")
print("TRAINING H2OAUTOML")
aml = H2OAutoML(**aml_args)
aml.train(x=x, y=y, training_frame=training_frame)
print(aml.leaderboard)
print('SUCCESS')