Source code for h2o.connection

"""
An H2OConnection represents the latest active handle to a cloud. No more than a single
H2OConnection object will be active at any one time.
"""

import requests
import math
import re
import os
import sys
import string
import time
import tempfile
import tabulate
import subprocess
import atexit
import pkg_resources
from two_dim_table import H2OTwoDimTable
import h2o

__H2OCONN__ = None            # the single active connection to H2O cloud
__H2O_REST_API_VERSION__ = 3  # const for the version of the rest api

[docs]class H2OConnection(object): """ H2OConnection is a class that represents a connection to the H2O cluster. It is specified by an IP address and a port number. Objects of type H2OConnection are not instantiated directly! This class contains static methods for performing the common REST methods GET, POST, and DELETE. """ def __init__(self, ip="localhost", port=54321, size=1, start_h2o=False, enable_assertions=False, license=None, max_mem_size_GB=None, min_mem_size_GB=None, ice_root=None, strict_version_check=True): """ Instantiate the package handle to the H2O cluster. :param ip: An IP address, default is "localhost" :param port: A port, default is 54321 :param size: THe expected number of h2o instances (ignored if start_h2o is True) :param start_h2o: A boolean dictating whether this module should start the H2O jvm. An attempt is made anyways if _connect fails. :param enable_assertions: If start_h2o, pass `-ea` as a VM option.s :param license: If not None, is a path to a license file. :param max_mem_size_GB: Maximum heap size (jvm option Xmx) in gigabytes. :param min_mem_size_GB: Minimum heap size (jvm option Xms) in gigabytes. :param ice_root: A temporary directory (default location is determined by tempfile.mkdtemp()) to hold H2O log files. :return: None """ port = as_int(port) if not (isinstance(port, int) and 0 <= port <= sys.maxint): raise ValueError("Port out of range, "+port) global __H2OCONN__ self._cld = None self._ip = ip self._port = port self._session_id = None self._rest_version = __H2O_REST_API_VERSION__ self._child = getattr(__H2OCONN__, "_child") if hasattr(__H2OCONN__, "_child") else None __H2OCONN__ = self if start_h2o: if not ice_root: ice_root = tempfile.mkdtemp() cld = self._start_local_h2o_jar(max_mem_size_GB, min_mem_size_GB, enable_assertions, license, ice_root) else: try: cld = self._connect(size) except: # try to start local jar or re-raise previous exception print print print "No instance found at ip and port: " + ip + ":" + str(port) + ". Trying to start local jar..." print print if os.path.exists(os.path.join(sys.prefix, "h2o_jar/h2o.jar")): if not ice_root: ice_root = tempfile.mkdtemp() cld = self._start_local_h2o_jar(max_mem_size_GB, min_mem_size_GB, enable_assertions, license, ice_root) else: print "No jar file found. Could not start local instance." raise __H2OCONN__._cld = cld ver_h2o = cld['version'] try: ver_pkg = pkg_resources.get_distribution("h2o").version except: ver_pkg = "UNKNOWN" if ver_h2o != ver_pkg: message = \ "Version mismatch. H2O is version {0}, but the python package is version {1}.".format(ver_h2o, str(ver_pkg)) if strict_version_check: raise EnvironmentError, message else: print "Warning: {0}".format(message) H2OConnection._cluster_info() @staticmethod def _cluster_info(): global __H2OCONN__ cld = __H2OCONN__._cld # self._session_id = self.get_session_id() ncpus = sum([n['num_cpus'] for n in cld['nodes']]) allowed_cpus = sum([n['cpus_allowed'] for n in cld['nodes']]) mmax = sum([n['max_mem'] for n in cld['nodes']]) cluster_health = all([n['healthy'] for n in cld['nodes']]) ip = "127.0.0.1" if __H2OCONN__._ip=="localhost" else __H2OCONN__._ip cluster_info = [ ["H2O cluster uptime: ", get_human_readable_time(cld["cloud_uptime_millis"])], ["H2O cluster version: ", cld["version"]], ["H2O cluster name: ", cld["cloud_name"]], ["H2O cluster total nodes: ", cld["cloud_size"]], ["H2O cluster total memory: ", get_human_readable_size(mmax)], ["H2O cluster total cores: ", str(ncpus)], ["H2O cluster allowed cores: ", str(allowed_cpus)], ["H2O cluster healthy: ", str(cluster_health)], ["H2O Connection ip: ", ip], ["H2O Connection port: ", __H2OCONN__._port], ] __H2OCONN__._cld = H2OConnection.get_json(url_suffix="Cloud") # update the cached version of cld h2o.H2ODisplay(cluster_info) def _connect(self, size, max_retries=5, print_dots=False): """ Does not actually "connect", instead simply tests that the cluster can be reached, is of a certain size, and is taking basic status commands. :param size: The number of H2O instances in the cloud. :return: The JSON response from a "stable" cluster. """ max_retries = max_retries retries = 0 while True: retries += 1 if print_dots: self._print_dots(retries) try: cld = H2OConnection.get_json(url_suffix="Cloud") if not cld['cloud_healthy']: raise ValueError("Cluster reports unhealthy status", cld) if cld['cloud_size'] >= size and cld['consensus']: if print_dots: print " Connection sucessful!" return cld except EnvironmentError: pass # Cloud too small or voting in progress; sleep; try again time.sleep(0.1) if retries > max_retries: raise EnvironmentError("Max retries exceeded. Could not establish link to the H2O cloud @ " + str(self._ip) + ":" + str(self._port)) def _print_dots(self, retries): sys.stdout.write("\rStarting H2O JVM and connecting: {}".format("." * retries)) sys.stdout.flush() def _start_local_h2o_jar(self, mmax, mmin, ea, license, ice): command = H2OConnection._check_java() if license: if not os.path.exists(license): raise ValueError("License file not found (" + license + ")") if not ice: raise ValueError("`ice_root` must be specified") stdout = open(H2OConnection._tmp_file("stdout"), 'w') stderr = open(H2OConnection._tmp_file("stderr"), 'w') print "Using ice_root: " + ice print jar_file = os.path.join(sys.prefix, "h2o_jar/h2o.jar") jver = subprocess.check_output([command, "-version"], stderr=subprocess.STDOUT) print print "Java Version: " + jver print if "GNU libgcj" in jver: raise ValueError("Sorry, GNU Java is not supported for H2O.\n"+ "Please download the latest Java SE JDK 7 from the following URL:\n"+ "http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html") if "Client VM" in jver: print "WARNING: " print "You have a 32-bit version of Java. H2O works best with 64-bit Java." print "Please download the latest Java SE JDK 7 from the following URL:" print "http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html" print vm_opts = [] if mmin: vm_opts += ["-Xms{}g".format(mmin)] if mmax: vm_opts += ["-Xmx{}g".format(mmax)] if ea: vm_opts += ["-ea"] h2o_opts = ["-jar", jar_file, "-name", "H2O_started_from_python", "-ip", "127.0.0.1", "-port", "54321", "-ice_root", ice, ] if license: h2o_opts += ["-license", license] cmd = [command] + vm_opts + h2o_opts cwd = os.path.abspath(os.getcwd()) self._child = subprocess.Popen(args=cmd, stdout=stdout, stderr=stderr, cwd=cwd) cld = self._connect(1, 30, True) return cld @staticmethod def _check_java(): # *WARNING* some over-engineering follows... :{ # is java in PATH? if H2OConnection._pwhich("java"): return H2OConnection._pwhich("java") # check if JAVA_HOME is set (for windoz) if os.getenv("JAVA_HOME"): return os.path.join(os.getenv("JAVA_HOME"), "bin", "java.exe") # check /Program Files/ and /Program Files (x86)/ if os is windoz if sys.platform == "windows": program_folder = os.path.join("C:", "{}", "Java") program_folders = [program_folder.format("Program Files"), program_folder.format("Program Files (x86)")] # check both possible program files... for folder in program_folders: # hunt down the jdk directory possible_jdk_dir = [d for d in folder if 'jdk' in d] # if got a non-empty list of jdk directory candidates if len(possible_jdk_dir) != 0: # loop over and check if the java.exe exists for jdk in possible_jdk_dir: path = os.path.join(folder, jdk, "bin", "java.exe") if os.path.exists(path): return path # check for JRE and warn for folder in program_folders: path = os.path.join(folder, "jre7", "bin", "java.exe") if os.path.exists(path): raise ValueError("Found JRE at " + path + "; but H2O requires the JDK to run.") else: raise ValueError("Cannot find Java. Please install the latest JDK from\n" +"http://www.oracle.com/technetwork/java/javase/downloads/index.html" ) @staticmethod def _pwhich(e): """ POSIX style which """ ok = os.X_OK if e: if os.access(e, ok): return e for path in os.getenv('PATH').split(os.pathsep): full_path = os.path.join(path, e) if os.access(full_path, ok): return full_path return None @staticmethod def _tmp_file(type): if sys.platform == "windows": usr = re.sub("[^A-Za-z0-9]", "_", os.getenv("USERNMAME")) else: usr = re.sub("[^A-Za-z0-9]", "_", os.getenv("USER")) if type == "stdout": path = os.path.join(tempfile.mkdtemp(), "h2o_{}_started_from_python.out".format(usr)) print "JVM stdout: " + path return path if type == "stderr": path = os.path.join(tempfile.mkdtemp(), "h2o_{}_started_from_python.err".format(usr)) print "JVM stderr: " + path return path if type == "pid": return os.path.join(tempfile.mkdtemp(), "h2o_{}_started_from_python.pid".format(usr)) raise ValueError("Unkown type in H2OConnection._tmp_file call: " + type) @staticmethod
[docs] def get_session_id(): return H2OConnection.get_json(url_suffix="InitID")["session_key"]
@staticmethod
[docs] def rest_version(): return __H2OCONN__._rest_version
@staticmethod
[docs] def session_id(): return __H2OCONN__._session_id
@staticmethod
[docs] def port(): return __H2OCONN__._port
@staticmethod
[docs] def ip(): return __H2OCONN__._ip
@staticmethod
[docs] def current_connection(): return __H2OCONN__
@staticmethod
[docs] def check_conn(): if not __H2OCONN__: raise EnvironmentError("No active connection to an H2O cluster. Try calling `h2o.init()`") return __H2OCONN__
""" Below is the REST implementation layer: _attempt_rest -- GET, POST, DELETE _do_raw_rest get post get_json post_json All methods are static and rely on an active __H2OCONN__ object. """ @staticmethod
[docs] def get(url_suffix, **kwargs): if __H2OCONN__ is None: raise ValueError("No h2o connection. Did you run `h2o.init()` ?") return __H2OCONN__._do_raw_rest(url_suffix, "GET", None, **kwargs)
@staticmethod
[docs] def post(url_suffix, file_upload_info=None, **kwargs): if __H2OCONN__ is None: raise ValueError("No h2o connection. Did you run `h2o.init()` ?") return __H2OCONN__._do_raw_rest(url_suffix, "POST", file_upload_info, **kwargs)
@staticmethod
[docs] def delete(url_suffix, **kwargs): if __H2OCONN__ is None: raise ValueError("No h2o connection. Did you run `h2o.init()` ?") return __H2OCONN__._do_raw_rest(url_suffix, "DELETE", None, **kwargs)
@staticmethod
[docs] def get_json(url_suffix, **kwargs): if __H2OCONN__ is None: raise ValueError("No h2o connection. Did you run `h2o.init()` ?") return __H2OCONN__._rest_json(url_suffix, "GET", None, **kwargs)
@staticmethod
[docs] def post_json(url_suffix, file_upload_info=None, **kwargs): if __H2OCONN__ is None: raise ValueError("No h2o connection. Did you run `h2o.init()` ?") return __H2OCONN__._rest_json(url_suffix, "POST", file_upload_info, **kwargs)
def _rest_json(self, url_suffix, method, file_upload_info, **kwargs): raw_txt = self._do_raw_rest(url_suffix, method, file_upload_info, **kwargs) return self._process_tables(raw_txt.json()) # Massage arguments into place, call _attempt_rest def _do_raw_rest(self, url_suffix, method, file_upload_info, **kwargs): if not url_suffix: raise ValueError("No url suffix supplied.") url = "http://{}:{}/{}/{}".format(self._ip,self._port,self._rest_version,url_suffix) query_string = "" for k,v in kwargs.iteritems(): if isinstance(v, list): x = '[' x += ','.join([str(l).encode("utf-8") for l in v]) x += ']' else: x = str(v).encode("utf-8") query_string += k+"="+x+"&" query_string = query_string[:-1] # Remove trailing extra & post_body = "" if not file_upload_info: if method == "POST": post_body = query_string elif query_string != '': url = "{}?{}".format(url, query_string) else: if not method == "POST": raise ValueError("Received file upload info and expected method to be POST. Got: " + str(method)) if query_string != '': url = "{}?{}".format(url, query_string) begin_time_seconds = time.time() http_result = self._attempt_rest(url, method, post_body, file_upload_info) end_time_seconds = time.time() elapsed_time_seconds = end_time_seconds - begin_time_seconds elapsed_time_millis = elapsed_time_seconds * 1000 if not http_result.ok: detailed_error_msgs = [] try: result = http_result.json() if 'messages' in result.keys(): detailed_error_msgs = '\n'.join([m['message'] for m in result['messages'] if m['message_type'] in \ ['ERROR']]) elif 'exception_msg' in result.keys(): detailed_error_msgs = result['exception_msg'] except ValueError: pass raise EnvironmentError(("h2o-py got an unexpected HTTP status code:\n {} {} (method = {}; url = {}). \n"+ \ "detailed error messages: {}") .format(http_result.status_code,http_result.reason,method,url,detailed_error_msgs)) # TODO: is.logging? -> write to logs # TODO: basically transform this R into Python # if (.h2o.isLogging()) { # .h2o.logRest("") # .h2o.logRest(sprintf("curlError: %s", as.character(.__curlError))) # .h2o.logRest(sprintf("curlErrorMessage: %s", .__curlErrorMessage)) # .h2o.logRest(sprintf("httpStatusCode: %d", httpStatusCode)) # .h2o.logRest(sprintf("httpStatusMessage: %s", httpStatusMessage)) # .h2o.logRest(sprintf("millis: %s", as.character(as.integer(deltaMillis)))) # .h2o.logRest("") # .h2o.logRest(payload) # .h2o.logRest("") # } return http_result # Low level request call def _attempt_rest(self, url, method, post_body, file_upload_info): headers = {'User-Agent': 'H2O Python client/'+string.replace(sys.version, '\n', '')} try: if method == "GET": return requests.get(url, headers=headers) elif file_upload_info: files = {file_upload_info["file"] : open(file_upload_info["file"], "rb")} return requests.post(url, files=files, headers=headers) elif method == "POST": return requests.post(url, data=post_body, headers=headers) elif method == "DELETE": return requests.delete(url, headers=headers) else: raise ValueError("Unknown HTTP method " + method) except requests.ConnectionError as e: raise EnvironmentError("h2o-py encountered an unexpected HTTP error:\n {}".format(e.message)) return http_result # TODO: # @staticmethod # def _process_matrices(x=None): # if x: # if isinstance(x, "dict"): # # return x @staticmethod def _process_tables(x=None): if x: if isinstance(x, dict): has_meta = "__meta" in x has_schema_type = has_meta and "schema_type" in x["__meta"] have_table = has_schema_type and x["__meta"]["schema_type"] == "TwoDimTable" if have_table: col_formats = [c["format"] for c in x["columns"]] table_header = x["name"] col_types = [c["type"] for c in x["columns"]] col_headers = [c["name"] for c in x["columns"]] row_headers = ["" for i in range(len(col_headers))] cell_values = x["data"] tbl = H2OTwoDimTable(row_headers, col_headers, col_types, table_header, cell_values, col_formats) x = tbl else: for k in x: x[k] = H2OConnection._process_tables(x[k]) if isinstance(x, list): for it in range(len(x)): x[it] = H2OConnection._process_tables(x[it]) return x
[docs]def get_human_readable_size(num): exp_str = [(0, 'B'), (10, 'KB'), (20, 'MB'), (30, 'GB'), (40, 'TB'), (50, 'PB'), ] i = 0 rounded_val = 0 while i + 1 < len(exp_str) and num >= (2 ** exp_str[i + 1][0]): i += 1 rounded_val = round(float(num) / 2 ** exp_str[i][0], 2) return '%s %s' % (rounded_val, exp_str[i][1])
[docs]def get_human_readable_time(epochTimeMillis): days = epochTimeMillis/(24*60*60*1000.0) hours = (days-math.floor(days))*24 minutes = (hours-math.floor(hours))*60 seconds = (minutes-math.floor(minutes))*60 milliseconds = (seconds-math.floor(seconds))*1000 duration_vec = [int(math.floor(t)) for t in [days,hours,minutes,seconds,milliseconds]] names_duration_vec = ["days","hours","minutes","seconds","milliseconds"] duration_dict = dict(zip(names_duration_vec, duration_vec)) readable_time = "" for name in names_duration_vec: if duration_dict[name] > 0: readable_time += str(duration_dict[name]) + " " + name + " " return readable_time
[docs]def is_int(possible_int): try: int(possible_int) return True except ValueError: return False
[docs]def as_int(the_int): if not is_int(the_int): raise ValueError("Not a valid int value: " + str(the_int)) return int(the_int)
def _kill_jvm_fork(): global __H2OCONN__ if __H2OCONN__ is not None: if __H2OCONN__._child: __H2OCONN__._child.kill() print "Successfully stopped H2O JVM started by the h2o python module." atexit.register(_kill_jvm_fork)