Running Sparkling Water on Google Cloud Dataproc

This section describes how to run Sparkling Water on `Google Cloud Dataproc <>`__. 

Perform the following steps to start Sparkling Water ``H2OContext`` on Cloud Dataproc. 

**Note**: Be sure to allocate enough resources to handle your data. As a minimum recommendation, you should allocate 4 times memory of the size of your data in H2O.

1. Install the `Google SDK <>`__.

2. Install `Daisy <>`__. The path for Daisy will be required in a later step.

3. After Daisy is installed, download and save Google's ` <>`__. This custom image will be referenced in a later step.

4. Copy and save the following customization script. This will be required in a later step.



	apt-get update
	apt-get install -y python-dev python-pip jq

	cd /usr/lib/

	pip install pip==9.0.3
	pip install requests==2.18.4
	pip install tabulate
	pip install future
	pip install colorama
	pip install scikit-learn==0.19.1
	pip install
	pip install --upgrade google-cloud-bigquery
	pip install --upgrade google-cloud-storage

	pip install h2o_pysparkling_SUBST_SPARK_MAJOR_VERSION

5. Create a Dataproc Custom Image as defined `here <>`__. Note that this references the Daisy path, the custom image name, and the customization script from previous steps. The code will be similar to the following:


   python \
    --image-name <custom_image_name> \
    --dataproc-version <Cloud Dataproc version (example: "1.2.22")> \
    --customization-script <local path to your custom script> \
    --daisy-path <local path to the downloaded daisy binary> \
    --zone <Compute Engine zone to use for the location of the temporary VM> \
    --gcs-bucket <URI (gs://bucket-name) of a Cloud Storage bucket in your project> \
    [--no-smoke-test: <"true" or "false" (default: "false")>

6. Copy and save the following initialization script into a GCS bucket that you have access to. This will be required when you create your cluster.




	CLUSTER_NAME=$(curl -f -s -H Metadata-Flavor:Google \

	cat << EOF > /usr/local/bin/
	# Helper to get current cluster state.
	function get_cluster_state() {
		echo \$(gcloud dataproc clusters describe ${CLUSTER_NAME} | \
	  	grep -A 1 "^status:" | grep "state:" | cut -d ':' -f 2)
	# Helper which waits for RUNNING state before issuing the command.
	function await_and_submit() {
		local cluster_state=\$(get_cluster_state)
		echo "Current cluster state: \${cluster_state}"
		while [[ "\${cluster_state}" != 'RUNNING' ]]; do
		echo "Sleeping to await cluster health..."
		sleep 5
		local cluster_state=\$(get_cluster_state)
		if [[ "\${cluster_state}" == 'ERROR' ]]; then
		  echo "Giving up due to cluster state '\${cluster_state}'"
		  exit 1

		echo "Changing Spark Configurations"
		sudo sed -i 's/spark.dynamicAllocation.enabled true/spark.dynamicAllocation.enabled false/g' /usr/lib/spark/conf/spark-defaults.conf
		sudo sed -i 's/spark.executor.instances 10000/# spark.executor.instances 10000/g' /usr/lib/spark/conf/spark-defaults.conf
		sudo sed -i 's/spark.executor.cores.*/# removing unnecessary limits to executor cores/g' /usr/lib/spark/conf/spark-defaults.conf
		sudo sed -i 's/^spark.executor.memory.*/# removing unnecessary limits to executor memory/g' /usr/lib/spark/conf/spark-defaults.conf
		sudo echo "spark.executor.instances $(gcloud dataproc clusters describe ${CLUSTER_NAME} | grep "numInstances:" | tail -1 | sed "s/.*numInstances: //g")" >> /usr/lib/spark/conf/spark-defaults.conf
		sudo echo "spark.executor.cores $(gcloud compute machine-types describe $(gcloud dataproc clusters describe ${CLUSTER_NAME} | grep "machineTypeUri" | tail -1 | sed 's/.*machineTypeUri: //g') | grep "guestCpus" | sed 's/guestCpus: //g')" >> /usr/lib/spark/conf/spark-defaults.conf
		sudo echo "spark.executor.memory $(($(gcloud compute machine-types describe $(gcloud dataproc clusters describe h2o-dataproc | grep "machineTypeUri" | tail -1 | sed 's/.*machineTypeUri: //g') | grep "memoryMb:" | sed 's/memoryMb: //g') * 65 / 100))m" >> /usr/lib/spark/conf/spark-defaults.conf
		echo "Successfully Changed spark-defaults.conf"

		cat /usr/lib/spark/conf/spark-defaults.conf


	chmod 750 /usr/local/bin/
	nohup /usr/local/bin/ &>> \
	/var/log/master-post-init.log &

7. After the image is created and the script is saved, create the cluster as defined `here <>`__ using the script created above. The only required flags are ``image`` and ``--initialization-actions``. 


  gcloud dataproc clusters create sparklingwaterdataproc \
   --image=<myswdataprocimage> \

Upon successful completion, you will have a Dataproc running Sparkling Water. You can run jobs now, for example:


  gcloud dataproc jobs submit pyspark \
    --cluster cluster-name --region region \ 

**Note**: Dataproc does not automatically enable Spark logs. Refer to the following Stackoverflow answers:

- `Google Dataproc Pyspark Properties <>`__
- `Where are the individual dataproc spark logs? <>`__

Sample Script for Sparkling Water Job

Below is a sample script for running a Sparkling Water job. Edit the arguments to match your bucket and GCP setup.

.. code:: python

	import h2o
	from h2o.automl import H2OAutoML
	from pyspark.sql import SparkSession
	from pysparkling import *

	spark = SparkSession.builder.appName("SparklingWaterApp").getOrCreate()
	hc = H2OContext.getOrCreate()

	bucket = "h2o-bq-large-dataset"
	train_path = "demos/cc_train.csv"
	test_path = "demos/cc_test.csv"
	is_classification = True

	drop_cols = []
	aml_args = {"max_runtime_secs": 120}

	train_data =\
	                  .options(header='true', inferSchema='true')\
	                  .csv("gs://{}/{}".format(bucket, train_path))
	test_data =\
	                 .options(header='true', inferSchema='true')\
	                 .csv("gs://{}/{}".format(bucket, test_path))

	training_frame = hc.asH2OFrame(train_data)
	test_frame = hc.asH2OFrame(test_data)

	x = training_frame.columns

	for col in drop_cols:

	if is_classification:
	    training_frame[y] = training_frame[y].asfactor()
	    print("REGRESSION: Not setting target column as factor")

	aml = H2OAutoML(**aml_args)
	aml.train(x=x, y=y, training_frame=training_frame)

