Source code for h2o.connection

"""
An H2OConnection represents the latest active handle to a cloud. No more than a single
H2OConnection object will be active at any one time.
"""
from __future__ import print_function
from __future__ import absolute_import
import requests
import math
import tempfile
import os
import re
import sys
import time
import subprocess
import atexit
import warnings
import site
from .display import H2ODisplay
from .h2o_logging import _is_logging, _log_rest
from .two_dim_table import H2OTwoDimTable
from .utils.shared_utils import quote
from six import iteritems, PY3
from string import ascii_lowercase, digits
from random import choice
warnings.simplefilter('always', UserWarning)
try:
  warnings.simplefilter('ignore', requests.packages.urllib3.exceptions.InsecureRequestWarning)
except:
  pass
__H2OCONN__ = None            # the single active connection to H2O cloud
__H2O_REST_API_VERSION__ = 3  # const for the version of the rest api


[docs]class H2OConnection(object): """ H2OConnection is a class that represents a connection to the H2O cluster. It is specified by an IP address and a port number. Objects of type H2OConnection are not instantiated directly! This class contains static methods for performing the common REST methods GET, POST, and DELETE. """ __ENCODING__ = "utf-8" __ENCODING_ERROR__ = "replace" def __init__(self, ip, port, start_h2o, enable_assertions, license, nthreads, max_mem_size, min_mem_size, ice_root, strict_version_check, proxy, https, insecure, username, password, max_mem_size_GB, min_mem_size_GB, proxies, size): """ Instantiate the package handle to the H2O cluster. :param ip: An IP address, default is "localhost" :param port: A port, default is 54321 :param start_h2o: A boolean dictating whether this module should start the H2O jvm. An attempt is made anyways if _connect fails. :param enable_assertions: If start_h2o, pass `-ea` as a VM option. :param license: If not None, is a path to a license file. :param nthreads: Number of threads in the thread pool. This relates very closely to the number of CPUs used. -1 means use all CPUs on the host. A positive integer specifies the number of CPUs directly. This value is only used when Python starts H2O. :param max_mem_size: Maximum heap size (jvm option Xmx) in gigabytes. :param min_mem_size: Minimum heap size (jvm option Xms) in gigabytes. :param ice_root: A temporary directory (default location is determined by tempfile.mkdtemp()) to hold H2O log files. :param strict_version_check: Setting this to False is unsupported and should only be done when advised by technical support. :param proxy: A dictionary with keys 'ftp', 'http', 'https' and values that correspond to a proxy path. :param https: Set this to True to use https instead of http. :param insecure: Set this to True to disable SSL certificate checking. :param username: Username to login with. :param password: Password to login with. :param max_mem_size_GB: DEPRECATED. Use max_mem_size. :param min_mem_size_GB: DEPRECATED. Use min_mem_size. :param proxies: DEPRECATED. Use proxy. :param size: DEPRECATED. :return: None """ port = as_int(port) if not (isinstance(port, int) and 0 <= port <= sys.maxsize): raise ValueError("Port out of range, "+port) if https != insecure: raise ValueError("`https` and `insecure` must both be True to enable HTTPS") #Deprecated params if max_mem_size_GB is not None: warnings.warn("`max_mem_size_GB` is deprecated. Use `max_mem_size` instead.", category=DeprecationWarning) max_mem_size = max_mem_size_GB if min_mem_size_GB is not None: warnings.warn("`min_mem_size_GB` is deprecated. Use `min_mem_size` instead.", category=DeprecationWarning) min_mem_size = min_mem_size_GB if proxies is not None: warnings.warn("`proxies` is deprecated. Use `proxy` instead.", category=DeprecationWarning) proxy = proxies if size is not None: warnings.warn("`size` is deprecated.", category=DeprecationWarning) global __H2OCONN__ self._cld = None self._ip = ip self._port = port self._proxy = proxy self._https = https self._insecure = insecure self._username = username self._password = password self._session_id = None self._rest_version = __H2O_REST_API_VERSION__ self._child = getattr(__H2OCONN__, "_child") if hasattr(__H2OCONN__, "_child") else None __H2OCONN__ = self #Give user warning if proxy environment variable is found. PUBDEV-2504 for name, value in os.environ.items(): if name.lower()[-6:] == '_proxy' and value: warnings.warn("Proxy environment variable `" + name + "` with value `" + value + "` found. This may interfere with your H2O Connection.") jarpaths = H2OConnection.jar_paths() if os.path.exists(jarpaths[0]): jar_path = jarpaths[0] elif os.path.exists(jarpaths[1]): jar_path = jarpaths[1] elif os.path.exists(jarpaths[2]): jar_path = jarpaths[2] elif os.path.exists(jarpaths[3]): jar_path = jarpaths[3] elif os.path.exists(jarpaths[4]): jar_path = jarpaths[4] else: jar_path = jarpaths[5] try: cld = self._connect() except: # try to start local jar or re-raise previous exception if not start_h2o: raise ValueError("Cannot connect to H2O server. Please check that H2O is running at {}".format(H2OConnection.make_url(""))) print() print() print("No instance found at ip and port: " + ip + ":" + str(port) + ". Trying to start local jar...") print() print() path_to_jar = os.path.exists(jar_path) if path_to_jar: if not ice_root: ice_root = tempfile.mkdtemp() cld = self._start_local_h2o_jar(max_mem_size, min_mem_size, enable_assertions, license, ice_root, jar_path, nthreads) else: print("No jar file found. Could not start local instance.") print("Jar Paths searched: ") for jp in jarpaths: print("\t" + jp) print() raise __H2OCONN__._cld = cld if strict_version_check and os.environ.get('H2O_DISABLE_STRICT_VERSION_CHECK') is None: ver_h2o = cld['version'] from .__init__ import __version__ ver_pkg = "UNKNOWN" if __version__ == "SUBST_PROJECT_VERSION" else __version__ if ver_h2o != ver_pkg: try: branch_name_h2o = cld['branch_name'] except KeyError: branch_name_h2o = None else: branch_name_h2o = cld['branch_name'] try: build_number_h2o = cld['build_number'] except KeyError: build_number_h2o = None else: build_number_h2o = cld['build_number'] if build_number_h2o is None: print("Version mismatch. H2O is version {0}, but the h2o-python package is version {1}. Upgrade H2O and h2o-Python to latest stable version - http://h2o-release.s3.amazonaws.com/h2o/latest_stable.html".format(ver_h2o, str(ver_pkg))) sys.exit("STOP: FIX VERSION MISMATCH TO AVOID FUTURE ERRORS") elif build_number_h2o == 'unknown': print("Version mismatch. H2O is version {0}, but the h2o-python package is version {1}. Upgrade H2O and h2o-Python to latest stable version - http://h2o-release.s3.amazonaws.com/h2o/latest_stable.html".format(ver_h2o, str(ver_pkg))) sys.exit("STOP: FIX VERSION MISMATCH TO AVOID FUTURE ERRORS") elif build_number_h2o == '99999': print("Version mismatch. H2O is version {0}, but the h2o-python package is version {1}. This is a developer build, please contact your developer.".format(ver_h2o, str(ver_pkg))) sys.exit("STOP: FIX VERSION MISMATCH TO AVOID FUTURE ERRORS") else: print("Version mismatch. H2O is version {0}, but the h2o-python package is version {1}.Install the matching h2o-Python version from - http://h2o-release.s3.amazonaws.com/h2o/{2}/{3}/index.html.".format(ver_h2o, str(ver_pkg),branch_name_h2o, build_number_h2o)) sys.exit("STOP: FIX VERSION MISMATCH TO AVOID FUTURE ERRORS") self._session_id = H2OConnection.get_json(url_suffix="InitID")["session_key"] H2OConnection._cluster_info() @staticmethod
[docs] def default(): H2OConnection.__ENCODING__ = "utf-8" H2OConnection.__ENCODING_ERROR__ = "replace"
@staticmethod
[docs] def jar_paths(): sys_prefix1 = sys_prefix2 = sys.prefix if sys_prefix1.startswith('/Library'): sys_prefix2 = '/System'+sys_prefix1 elif sys_prefix1.startswith('/System'): sys_prefix2 = sys_prefix1.split('/System')[1] return [os.path.join(sys_prefix1, "h2o_jar", "h2o.jar"), os.path.join(os.path.sep,"usr","local","h2o_jar","h2o.jar"), os.path.join(sys_prefix1, "local", "h2o_jar", "h2o.jar"), os.path.join(site.USER_BASE, "h2o_jar", "h2o.jar"), os.path.join(sys_prefix2, "h2o_jar", "h2o.jar"), os.path.join(sys_prefix2, "h2o_jar", "h2o.jar"), ]
@staticmethod def _cluster_info(): global __H2OCONN__ cld = __H2OCONN__._cld ncpus = sum([n['num_cpus'] for n in cld['nodes']]) allowed_cpus = sum([n['cpus_allowed'] for n in cld['nodes']]) mfree = sum([n['free_mem'] for n in cld['nodes']]) cluster_health = all([n['healthy'] for n in cld['nodes']]) ip = "127.0.0.1" if __H2OCONN__._ip=="localhost" else __H2OCONN__._ip cluster_info = [ ["H2O cluster uptime: ", get_human_readable_time(cld["cloud_uptime_millis"])], ["H2O cluster version: ", cld["version"]], ["H2O cluster name: ", cld["cloud_name"]], ["H2O cluster total nodes: ", cld["cloud_size"]], ["H2O cluster total free memory: ", get_human_readable_size(mfree)], ["H2O cluster total cores: ", str(ncpus)], ["H2O cluster allowed cores: ", str(allowed_cpus)], ["H2O cluster healthy: ", str(cluster_health)], ["H2O Connection ip: ", ip], ["H2O Connection port: ", __H2OCONN__._port], ["H2O Connection proxy: ", __H2OCONN__._proxy], ["Python Version: ", sys.version.split()[0]], ] __H2OCONN__._cld = H2OConnection.get_json(url_suffix="Cloud") # update the cached version of cld H2ODisplay(cluster_info) def _connect(self, size=1, max_retries=5, print_dots=False): """ Does not actually "connect", instead simply tests that the cluster can be reached, is of a certain size, and is taking basic status commands. :param size: The number of H2O instances in the cloud. :return: The JSON response from a "stable" cluster. """ retries = 0 while True: retries += 1 if print_dots: self._print_dots(retries) try: cld = H2OConnection.get_json(url_suffix="Cloud") if not cld['cloud_healthy']: raise ValueError("Cluster reports unhealthy status", cld) if cld['cloud_size'] >= size and cld['consensus']: if print_dots: print(" Connection successful!") return cld except EnvironmentError: pass # Cloud too small or voting in progress; sleep; try again time.sleep(0.1) if retries > max_retries: raise EnvironmentError("Max retries exceeded. Could not establish link to the H2O cloud @ " + str(self._ip) + ":" + str(self._port)) def _print_dots(self, retries): sys.stdout.write("\rStarting H2O JVM and connecting: {}".format("." * retries)) sys.stdout.flush() def _start_local_h2o_jar(self, mmax, mmin, ea, license, ice, jar_path, nthreads): command = H2OConnection._check_java() if license: if not os.path.exists(license): raise ValueError("License file not found (" + license + ")") if not ice: raise ValueError("`ice_root` must be specified") stdout = open(H2OConnection._tmp_file("stdout"), 'w') stderr = open(H2OConnection._tmp_file("stderr"), 'w') print("Using ice_root: " + ice) print() jver = subprocess.check_output([command, "-version"], stderr=subprocess.STDOUT) if PY3: jver = str(jver, H2OConnection.__ENCODING__) print() print("Java Version: " + jver) print() if "GNU libgcj" in jver: raise ValueError("Sorry, GNU Java is not supported for H2O.\n"+ "Please download the latest Java SE JDK 7 from the following URL:\n"+ "http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html") if "Client VM" in jver: print("WARNING: ") print("You have a 32-bit version of Java. H2O works best with 64-bit Java.") print("Please download the latest Java SE JDK 7 from the following URL:") print("http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html") print() vm_opts = [] if mmin: vm_opts += ["-Xms{}g".format(mmin)] if mmax: vm_opts += ["-Xmx{}g".format(mmax)] if ea: vm_opts += ["-ea"] h2o_opts = ["-verbose:gc", "-XX:+PrintGCDetails", "-XX:+PrintGCTimeStamps", "-jar", jar_path, "-name", "H2O_started_from_python_" + re.sub("[^A-Za-z0-9]", "_", (os.getenv("USERNAME") if sys.platform == "win32" else os.getenv("USER")) or "unknownUser") + "_" + "".join([choice(ascii_lowercase) for _ in range(3)] + [choice(digits) for _ in range(3)]), "-ip", "127.0.0.1", "-port", "54321", "-ice_root", ice, ] if nthreads > 0: h2o_opts += ["-nthreads", str(nthreads)] if license: h2o_opts += ["-license", license] cmd = [command] + vm_opts + h2o_opts cwd = os.path.abspath(os.getcwd()) if sys.platform == "win32": self._child = subprocess.Popen(args=cmd,stdout=stdout,stderr=stderr,cwd=cwd,creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) else: self._child = subprocess.Popen(args=cmd, stdout=stdout, stderr=stderr, cwd=cwd, preexec_fn=os.setsid) cld = self._connect(1, 30, True) return cld @staticmethod def _check_java(): # *WARNING* some over-engineering follows... :{ # is java in PATH? if H2OConnection._pwhich("java"): return H2OConnection._pwhich("java") # check if JAVA_HOME is set (for windoz) if os.getenv("JAVA_HOME"): return os.path.join(os.getenv("JAVA_HOME"), "bin", "java.exe") # check /Program Files/ and /Program Files (x86)/ if os is windoz if sys.platform == "win32": program_folder = os.path.join("C:", "{}", "Java") program_folders = [program_folder.format("Program Files"), program_folder.format("Program Files (x86)")] # check both possible program files... for folder in program_folders: # hunt down the jdk directory possible_jdk_dir = [d for d in folder if 'jdk' in d] # if got a non-empty list of jdk directory candidates if len(possible_jdk_dir) != 0: # loop over and check if the java.exe exists for jdk in possible_jdk_dir: path = os.path.join(folder, jdk, "bin", "java.exe") if os.path.exists(path): return path # check for JRE and warn for folder in program_folders: path = os.path.join(folder, "jre7", "bin", "java.exe") if os.path.exists(path): raise ValueError("Found JRE at " + path + "; but H2O requires the JDK to run.") else: raise ValueError("Cannot find Java. Please install the latest JDK from\n" +"http://www.oracle.com/technetwork/java/javase/downloads/index.html" ) @staticmethod def _pwhich(e): """ POSIX style which """ ok = os.X_OK if e: if os.access(e, ok): return e for path in os.getenv('PATH').split(os.pathsep): full_path = os.path.join(path, e) if os.access(full_path, ok): return full_path return None @staticmethod def _tmp_file(type): usr = re.sub("[^A-Za-z0-9]", "_", (os.getenv("USERNAME") if sys.platform == "win32" else os.getenv("USER")) or "unknownUser") if type == "stdout": path = os.path.join(tempfile.mkdtemp(), "h2o_{}_started_from_python.out".format(usr)) print("JVM stdout: " + path) return path if type == "stderr": path = os.path.join(tempfile.mkdtemp(), "h2o_{}_started_from_python.err".format(usr)) print("JVM stderr: " + path) return path if type == "pid": return os.path.join(tempfile.mkdtemp(), "h2o_{}_started_from_python.pid".format(usr)) raise ValueError("Unkown type in H2OConnection._tmp_file call: " + type) @staticmethod def _shutdown(conn, prompt): """ Shut down the specified instance. All data will be lost. This method checks if H2O is running at the specified IP address and port, and if it is, shuts down that H2O instance. :param conn: An H2OConnection object containing the IP address and port of the server running H2O. :param prompt: A logical value indicating whether to prompt the user before shutting down the H2O server. :return: None """ global __H2OCONN__ if conn is None: raise ValueError("There is no H2O instance running.") try: if not conn.cluster_is_up(conn): raise ValueError("There is no H2O instance running at ip: {0} and port: " "{1}".format(conn.ip(), conn.port())) except: #H2O is already shutdown on the java side ip = conn.ip() port = conn.port() __H2OCONN__= None raise ValueError("The H2O instance running at {0}:{1} has already been shutdown.".format(ip, port)) if not isinstance(prompt, bool): raise ValueError("`prompt` must be TRUE or FALSE") if prompt: question = "Are you sure you want to shutdown the H2O instance running at {0}:{1} (Y/N)? ".format(conn.ip(), conn.port()) response = input(question) if PY3 else raw_input(question) else: response = "Y" if response == "Y" or response == "y": conn.post(url_suffix="Shutdown") __H2OCONN__ = None #so that the "Did you run `h2o.init()`" ValueError is triggered @staticmethod
[docs] def rest_version(): return __H2OCONN__._rest_version
@staticmethod
[docs] def session_id(): return __H2OCONN__._session_id
@staticmethod
[docs] def port(): return __H2OCONN__._port
@staticmethod
[docs] def ip(): return __H2OCONN__._ip
@staticmethod
[docs] def https(): return __H2OCONN__._https
@staticmethod
[docs] def username(): return __H2OCONN__._username
@staticmethod
[docs] def password(): return __H2OCONN__._password
@staticmethod
[docs] def insecure(): return __H2OCONN__._insecure
@staticmethod
[docs] def current_connection(): return __H2OCONN__
@staticmethod
[docs] def check_conn(): if not __H2OCONN__: raise EnvironmentError("No active connection to an H2O cluster. Try calling `h2o.init()`") return __H2OCONN__
@staticmethod
[docs] def cluster_is_up(conn): """ Determine if an H2O cluster is up or not :param conn: An H2OConnection object containing the IP address and port of the server running H2O. :return: TRUE if the cluster is up; FALSE otherwise """ if not isinstance(conn, H2OConnection): raise ValueError("`conn` must be an H2OConnection object") rv = conn.current_connection()._attempt_rest(url=("https" if conn.https() else "http") +"://{0}:{1}/".format(conn.ip(), conn.port()), method="GET", post_body="", file_upload_info="") if rv.status_code == 401: warnings.warn("401 Unauthorized Access. Did you forget to provide a username and password?") return rv.status_code == 200 or rv.status_code == 301
""" Below is the REST implementation layer: _attempt_rest -- GET, POST, DELETE _do_raw_rest get post get_json post_json All methods are static and rely on an active __H2OCONN__ object. """ @staticmethod
[docs] def make_url(url_suffix, _rest_version=None): scheme = "https" if H2OConnection.https() else "http" _rest_version = _rest_version or H2OConnection.rest_version() return "{}://{}:{}/{}/{}".format(scheme,H2OConnection.ip(),H2OConnection.port(),_rest_version,url_suffix)
@staticmethod
[docs] def get(url_suffix, **kwargs): if __H2OCONN__ is None: raise ValueError("No h2o connection. Did you run `h2o.init()` ?") return __H2OCONN__._do_raw_rest(url_suffix, "GET", None, **kwargs)
@staticmethod
[docs] def post(url_suffix, file_upload_info=None, **kwargs): if __H2OCONN__ is None: raise ValueError("No h2o connection. Did you run `h2o.init()` ?") return __H2OCONN__._do_raw_rest(url_suffix, "POST", file_upload_info, **kwargs)
@staticmethod
[docs] def delete(url_suffix, **kwargs): if __H2OCONN__ is None: raise ValueError("No h2o connection. Did you run `h2o.init()` ?") return __H2OCONN__._do_raw_rest(url_suffix, "DELETE", None, **kwargs)
@staticmethod
[docs] def get_json(url_suffix, **kwargs): if __H2OCONN__ is None: raise ValueError("No h2o connection. Did you run `h2o.init()` ?") return __H2OCONN__._rest_json(url_suffix, "GET", None, **kwargs)
@staticmethod
[docs] def post_json(url_suffix, file_upload_info=None, **kwargs): if __H2OCONN__ is None: raise ValueError("No h2o connection. Did you run `h2o.init()` ?") return __H2OCONN__._rest_json(url_suffix, "POST", file_upload_info, **kwargs)
def _rest_json(self, url_suffix, method, file_upload_info, **kwargs): raw_txt = self._do_raw_rest(url_suffix, method, file_upload_info, **kwargs) return self._process_tables(raw_txt.json()) # Massage arguments into place, call _attempt_rest def _do_raw_rest(self, url_suffix, method, file_upload_info, **kwargs): if not url_suffix: raise ValueError("No url suffix supplied.") # allow override of REST version, currently used for Rapids which is /99 if '_rest_version' in kwargs: _rest_version = kwargs['_rest_version'] del kwargs['_rest_version'] else: _rest_version = self._rest_version url = H2OConnection.make_url(url_suffix,_rest_version) query_string = "" for k,v in iteritems(kwargs): if isinstance(v, list): x = '[' for l in v: if isinstance(l,list): x += '[' x += ','.join([str(e) if PY3 else str(e).encode(H2OConnection.__ENCODING__, errors=H2OConnection.__ENCODING_ERROR__) for e in l]) x += ']' else: x += str(l) if PY3 else str(l).encode(H2OConnection.__ENCODING__, errors=H2OConnection.__ENCODING_ERROR__) x += ',' x = x[:-1] x += ']' else: x = str(v) if PY3 else str(v).encode(H2OConnection.__ENCODING__, errors=H2OConnection.__ENCODING_ERROR__) query_string += k+"="+quote(x)+"&" query_string = query_string[:-1] # Remove trailing extra & post_body = "" if not file_upload_info: if method == "POST": post_body = query_string elif query_string != '': url = "{}?{}".format(url, query_string) else: if not method == "POST": raise ValueError("Received file upload info and expected method to be POST. Got: " + str(method)) if query_string != '': url = "{}?{}".format(url, query_string) if _is_logging(): _log_rest("------------------------------------------------------------\n") _log_rest("\n") _log_rest("Time: {0}\n".format(time.strftime('Y-%m-%d %H:%M:%OS3'))) _log_rest("\n") _log_rest("{0} {1}\n".format(method, url)) _log_rest("postBody: {0}\n".format(post_body)) global _rest_ctr; _rest_ctr = _rest_ctr+1 begin_time_seconds = time.time() http_result = self._attempt_rest(url, method, post_body, file_upload_info) end_time_seconds = time.time() elapsed_time_seconds = end_time_seconds - begin_time_seconds elapsed_time_millis = elapsed_time_seconds * 1000 if not http_result.ok: detailed_error_msgs = [] try: result = http_result.json() if 'messages' in result.keys(): detailed_error_msgs = '\n'.join([m['message'] for m in result['messages'] if m['message_type'] in ['ERRR']]) elif 'exception_msg' in result.keys(): detailed_error_msgs = result['exception_msg'] except ValueError: pass raise EnvironmentError(("h2o-py got an unexpected HTTP status code:\n {} {} (method = {}; url = {}). \n"+ \ "detailed error messages: {}") .format(http_result.status_code,http_result.reason,method,url,detailed_error_msgs)) if _is_logging(): _log_rest("\n") _log_rest("httpStatusCode: {0}\n".format(http_result.status_code)) _log_rest("httpStatusMessage: {0}\n".format(http_result.reason)) _log_rest("millis: {0}\n".format(elapsed_time_millis)) _log_rest("\n") _log_rest("{0}\n".format(http_result.json())) _log_rest("\n") return http_result # Low level request call def _attempt_rest(self, url, method, post_body, file_upload_info): auth = (self._username, self._password) verify = not self._insecure headers = {'User-Agent': 'H2O Python client/'+sys.version.replace('\n','')} try: if method == "GET": return requests.get(url, headers=headers, proxies=self._proxy, auth=auth, verify=verify) elif file_upload_info: files = {file_upload_info["file"] : open(file_upload_info["file"], "rb")} return requests.post(url, files=files, headers=headers, proxies=self._proxy, auth=auth, verify=verify) elif method == "POST": headers["Content-Type"] = "application/x-www-form-urlencoded" return requests.post(url, data=post_body, headers=headers, proxies=self._proxy, auth=auth, verify=verify) elif method == "DELETE": return requests.delete(url, headers=headers, proxies=self._proxy, auth=auth, verify=verify) else: raise ValueError("Unknown HTTP method " + method) except requests.ConnectionError as e: raise EnvironmentError("h2o-py encountered an unexpected HTTP error:\n {}".format(e)) # TODO: # @staticmethod # def _process_matrices(x=None): # if x: # if isinstance(x, "dict"): # # return x @staticmethod def _process_tables(x=None): if x: if isinstance(x, dict): has_meta = "__meta" in x has_schema_type = has_meta and "schema_type" in x["__meta"] have_table = has_schema_type and x["__meta"]["schema_type"] == "TwoDimTable" if have_table: col_formats = [c["format"] for c in x["columns"]] table_header = x["name"] table_descr = x["description"] col_types = [c["type"] for c in x["columns"]] col_headers = [c["name"] for c in x["columns"]] row_headers = ["" for i in range(len(col_headers))] cell_values = x["data"] tbl = H2OTwoDimTable(row_header=row_headers, col_header=col_headers, col_types=col_types, table_header=table_header, raw_cell_values=cell_values, col_formats=col_formats,table_description=table_descr) x = tbl else: for k in x: x[k] = H2OConnection._process_tables(x[k]) if isinstance(x, list): for it in range(len(x)): x[it] = H2OConnection._process_tables(x[it]) return x global _rest_ctr _rest_ctr = 0 @staticmethod
[docs] def rest_ctr(): global _rest_ctr; return _rest_ctr # On exit, close the session to allow H2O to cleanup any temps
[docs]def end_session(): try: H2OConnection.delete(url_suffix="InitID") print("Sucessfully closed the H2O Session.") except: pass
[docs]def get_human_readable_size(num): exp_str = [(0, 'B'), (10, 'KB'), (20, 'MB'), (30, 'GB'), (40, 'TB'), (50, 'PB'), ] i = 0 rounded_val = 0 while i + 1 < len(exp_str) and num >= (2 ** exp_str[i + 1][0]): i += 1 rounded_val = round(float(num) / 2 ** exp_str[i][0], 2) return '%s %s' % (rounded_val, exp_str[i][1])
[docs]def get_human_readable_time(epochTimeMillis): days = epochTimeMillis/(24*60*60*1000.0) hours = (days-math.floor(days))*24 minutes = (hours-math.floor(hours))*60 seconds = (minutes-math.floor(minutes))*60 milliseconds = (seconds-math.floor(seconds))*1000 duration_vec = [int(math.floor(t)) for t in [days,hours,minutes,seconds,milliseconds]] names_duration_vec = ["days","hours","minutes","seconds","milliseconds"] duration_dict = dict(zip(names_duration_vec, duration_vec)) readable_time = "" for name in names_duration_vec: if duration_dict[name] > 0: readable_time += str(duration_dict[name]) + " " + name + " " return readable_time
[docs]def is_int(possible_int): try: int(possible_int) return True except ValueError: return False
[docs]def as_int(the_int): if not is_int(the_int): raise ValueError("Not a valid int value: " + str(the_int)) return int(the_int)
def _kill_jvm_fork(): global __H2OCONN__ if __H2OCONN__ is not None: if __H2OCONN__._child: __H2OCONN__._child.kill() print("Successfully stopped H2O JVM started by the h2o python module.") atexit.register(_kill_jvm_fork) atexit.register(end_session)