# -*- encoding: utf-8 -*-
"""
h2o -- module for using H2O services.
:copyright: (c) 2016 H2O.ai
:license: Apache License Version 2.0 (see LICENSE for details)
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import os
import warnings
import webbrowser
from .backend import H2OConnection
from .backend import H2OConnectionConf
from .backend import H2OLocalServer
from .base import Keyed
from .estimators.deeplearning import H2OAutoEncoderEstimator
from .estimators.deeplearning import H2ODeepLearningEstimator
from .estimators.deepwater import H2ODeepWaterEstimator
from .estimators.xgboost import H2OXGBoostEstimator
from .estimators.generic import H2OGenericEstimator
from .estimators.gbm import H2OGradientBoostingEstimator
from .estimators.glm import H2OGeneralizedLinearEstimator
from .estimators.glrm import H2OGeneralizedLowRankEstimator
from .estimators.kmeans import H2OKMeansEstimator
from .estimators.naive_bayes import H2ONaiveBayesEstimator
from .estimators.pca import H2OPrincipalComponentAnalysisEstimator
from .estimators.random_forest import H2ORandomForestEstimator
from .estimators.stackedensemble import H2OStackedEnsembleEstimator
from .estimators.word2vec import H2OWord2vecEstimator
from .estimators.isolation_forest import H2OIsolationForestEstimator
from .exceptions import H2OConnectionError, H2OValueError
from .expr import ExprNode
from .frame import H2OFrame
from .grid.grid_search import H2OGridSearch
from .job import H2OJob
from .model.model_base import ModelBase
from .transforms.decomposition import H2OSVD
from .utils.config import H2OConfigReader
from .utils.compatibility import * # NOQA
from .utils.shared_utils import check_frame_id, deprecated, gen_header, py_tmp_key, quoted
from .utils.typechecks import assert_is_type, assert_satisfies, BoundInt, BoundNumeric, I, is_type, numeric, U
logging.basicConfig()
# An IPython deprecation warning is triggered after h2o.init(). Remove this once the deprecation has been resolved
warnings.filterwarnings('ignore', category=DeprecationWarning, module='.*/IPython/.*')
h2oconn = None # type: H2OConnection
[docs]def connect(server=None, url=None, ip=None, port=None,
https=None, verify_ssl_certificates=None, cacert=None,
auth=None, proxy=None, cookies=None, verbose=True, config=None):
"""
Connect to an existing H2O server, remote or local.
There are two ways to connect to a server: either pass a `server` parameter containing an instance of
an H2OLocalServer, or specify `ip` and `port` of the server that you want to connect to.
:param server: An H2OLocalServer instance to connect to (optional).
:param url: Full URL of the server to connect to (can be used instead of `ip` + `port` + `https`).
:param ip: The ip address (or host name) of the server where H2O is running.
:param port: Port number that H2O service is listening to.
:param https: Set to True to connect via https:// instead of http://.
:param verify_ssl_certificates: When using https, setting this to False will disable SSL certificates verification.
:param cacert: Path to a CA bundle file or a directory with certificates of trusted CAs (optional).
:param auth: Either a (username, password) pair for basic authentication, an instance of h2o.auth.SpnegoAuth
or one of the requests.auth authenticator objects.
:param proxy: Proxy server address.
:param cookies: Cookie (or list of) to add to request
:param verbose: Set to False to disable printing connection status messages.
:param connection_conf: Connection configuration object encapsulating connection parameters.
:returns: the new :class:`H2OConnection` object.
"""
global h2oconn
if config:
if "connect_params" in config:
h2oconn = _connect_with_conf(config["connect_params"])
else:
h2oconn = _connect_with_conf(config)
else:
h2oconn = H2OConnection.open(server=server, url=url, ip=ip, port=port, https=https,
auth=auth, verify_ssl_certificates=verify_ssl_certificates, cacert=cacert,
proxy=proxy, cookies=cookies,
verbose=verbose)
if verbose:
h2oconn.cluster.show_status()
return h2oconn
[docs]def api(endpoint, data=None, json=None, filename=None, save_to=None):
"""
Perform a REST API request to a previously connected server.
This function is mostly for internal purposes, but may occasionally be useful for direct access to
the backend H2O server. It has same parameters as :meth:`H2OConnection.request <h2o.backend.H2OConnection.request>`.
"""
# type checks are performed in H2OConnection class
_check_connection()
return h2oconn.request(endpoint, data=data, json=json, filename=filename, save_to=save_to)
[docs]def connection():
"""Return the current :class:`H2OConnection` handler."""
return h2oconn
def version_check():
"""Used to verify that h2o-python module and the H2O server are compatible with each other."""
from .__init__ import __version__ as ver_pkg
ci = h2oconn.cluster
if not ci:
raise H2OConnectionError("Connection not initialized. Did you run h2o.connect()?")
ver_h2o = ci.version
if ver_pkg == "SUBST_PROJECT_VERSION": ver_pkg = "UNKNOWN"
if str(ver_h2o) != str(ver_pkg):
branch_name_h2o = ci.branch_name
build_number_h2o = ci.build_number
if build_number_h2o is None or build_number_h2o == "unknown":
raise H2OConnectionError(
"Version mismatch. H2O is version {0}, but the h2o-python package is version {1}. "
"Upgrade H2O and h2o-Python to latest stable version - "
"http://h2o-release.s3.amazonaws.com/h2o/latest_stable.html"
"".format(ver_h2o, ver_pkg))
elif build_number_h2o == "99999":
raise H2OConnectionError(
"Version mismatch. H2O is version {0}, but the h2o-python package is version {1}. "
"This is a developer build, please contact your developer."
"".format(ver_h2o, ver_pkg))
else:
raise H2OConnectionError(
"Version mismatch. H2O is version {0}, but the h2o-python package is version {1}. "
"Install the matching h2o-Python version from - "
"http://h2o-release.s3.amazonaws.com/h2o/{2}/{3}/index.html."
"".format(ver_h2o, ver_pkg, branch_name_h2o, build_number_h2o))
# Check age of the install
if ci.build_too_old:
print("Warning: Your H2O cluster version is too old ({})! Please download and install the latest "
"version from http://h2o.ai/download/".format(ci.build_age))
[docs]def init(url=None, ip=None, port=None, name=None, https=None, cacert=None, insecure=None, username=None, password=None,
cookies=None, proxy=None, start_h2o=True, nthreads=-1, ice_root=None, log_dir=None, log_level=None,
enable_assertions=True, max_mem_size=None, min_mem_size=None, strict_version_check=None, ignore_config=False,
extra_classpath=None, jvm_custom_args=None, bind_to_localhost=True, **kwargs):
"""
Attempt to connect to a local server, or if not successful start a new server and connect to it.
:param url: Full URL of the server to connect to (can be used instead of `ip` + `port` + `https`).
:param ip: The ip address (or host name) of the server where H2O is running.
:param port: Port number that H2O service is listening to.
:param name: Cluster name. If None while connecting to an existing cluster it will not check the cluster name.
If set then will connect only if the target cluster name matches. If no instance is found and decides to start a local
one then this will be used as the cluster name or a random one will be generated if set to None.
:param https: Set to True to connect via https:// instead of http://.
:param cacert: Path to a CA bundle file or a directory with certificates of trusted CAs (optional).
:param insecure: When using https, setting this to True will disable SSL certificates verification.
:param username: Username and
:param password: Password for basic authentication.
:param cookies: Cookie (or list of) to add to each request.
:param proxy: Proxy server address.
:param start_h2o: If False, do not attempt to start an h2o server when connection to an existing one failed.
:param nthreads: "Number of threads" option when launching a new h2o server.
:param ice_root: Directory for temporary files for the new h2o server.
:param log_dir: Directory for H2O logs to be stored if a new instance is started. Ignored if connecting to an existing node.
:param log_level: The logger level for H2O if a new instance is started. One of TRACE,DEBUG,INFO,WARN,ERRR,FATA. Default is INFO. Ignored if connecting to an existing node.
:param enable_assertions: Enable assertions in Java for the new h2o server.
:param max_mem_size: Maximum memory to use for the new h2o server. Integer input will be evaluated as gigabytes. Other units can be specified by passing in a string (e.g. "160M" for 160 megabytes).
:param min_mem_size: Minimum memory to use for the new h2o server. Integer input will be evaluated as gigabytes. Other units can be specified by passing in a string (e.g. "160M" for 160 megabytes).
:param strict_version_check: If True, an error will be raised if the client and server versions don't match.
:param ignore_config: Indicates whether a processing of a .h2oconfig file should be conducted or not. Default value is False.
:param extra_classpath: List of paths to libraries that should be included on the Java classpath when starting H2O from Python.
:param kwargs: (all other deprecated attributes)
:param jvm_custom_args: Customer, user-defined argument's for the JVM H2O is instantiated in. Ignored if there is an instance of H2O already running and the client connects to it.
:param bind_to_localhost: A flag indicating whether access to the H2O instance should be restricted to the local machine (default) or if it can be reached from other computers on the network.
"""
global h2oconn
assert_is_type(url, str, None)
assert_is_type(ip, str, None)
assert_is_type(port, int, str, None)
assert_is_type(name, str, None)
assert_is_type(https, bool, None)
assert_is_type(insecure, bool, None)
assert_is_type(username, str, None)
assert_is_type(password, str, None)
assert_is_type(cookies, str, [str], None)
assert_is_type(proxy, {str: str}, None)
assert_is_type(start_h2o, bool, None)
assert_is_type(nthreads, int)
assert_is_type(ice_root, str, None)
assert_is_type(log_dir, str, None)
assert_is_type(log_level, str, None)
assert_satisfies(log_level, log_level in [None, "TRACE", "DEBUG", "INFO", "WARN", "ERRR", "FATA"])
assert_is_type(enable_assertions, bool)
assert_is_type(max_mem_size, int, str, None)
assert_is_type(min_mem_size, int, str, None)
assert_is_type(strict_version_check, bool, None)
assert_is_type(extra_classpath, [str], None)
assert_is_type(jvm_custom_args, [str], None)
assert_is_type(bind_to_localhost, bool)
assert_is_type(kwargs, {"proxies": {str: str}, "max_mem_size_GB": int, "min_mem_size_GB": int,
"force_connect": bool, "as_port": bool})
def get_mem_size(mmint, mmgb):
if not mmint: # treat 0 and "" as if they were None
if mmgb is None: return None
return mmgb << 30
if is_type(mmint, int):
# If the user gives some small number just assume it's in Gigabytes...
if mmint < 1000: return mmint << 30
return mmint
if is_type(mmint, str):
last = mmint[-1].upper()
num = mmint[:-1]
if not (num.isdigit() and last in "MGT"):
raise H2OValueError("Wrong format for a *_memory_size argument: %s (should be a number followed by "
"a suffix 'M', 'G' or 'T')" % mmint)
if last == "T": return int(num) << 40
if last == "G": return int(num) << 30
if last == "M": return int(num) << 20
scheme = "https" if https else "http"
proxy = proxy[scheme] if proxy is not None and scheme in proxy else \
kwargs["proxies"][scheme] if "proxies" in kwargs and scheme in kwargs["proxies"] else None
mmax = get_mem_size(max_mem_size, kwargs.get("max_mem_size_GB"))
mmin = get_mem_size(min_mem_size, kwargs.get("min_mem_size_GB"))
auth = (username, password) if username and password else None
check_version = True
verify_ssl_certificates = not insecure
# Apply the config file if ignore_config=False
if not ignore_config:
config = H2OConfigReader.get_config()
if url is None and ip is None and port is None and https is None and "init.url" in config:
url = config["init.url"]
if proxy is None and "init.proxy" in config:
proxy = config["init.proxy"]
if cookies is None and "init.cookies" in config:
cookies = config["init.cookies"].split(";")
if auth is None and "init.username" in config and "init.password" in config:
auth = (config["init.username"], config["init.password"])
if strict_version_check is None:
if "init.check_version" in config:
check_version = config["init.check_version"].lower() != "false"
elif os.environ.get("H2O_DISABLE_STRICT_VERSION_CHECK"):
check_version = False
else:
check_version = strict_version_check
# Note: `verify_ssl_certificates` is never None at this point => use `insecure` to check for None/default input)
if insecure is None and "init.verify_ssl_certificates" in config:
verify_ssl_certificates = config["init.verify_ssl_certificates"].lower() != "false"
if cacert is None:
if "init.cacert" in config:
cacert = config["init.cacert"]
assert_is_type(verify_ssl_certificates, bool)
if not start_h2o:
print("Warning: if you don't want to start local H2O server, then use of `h2o.connect()` is preferred.")
try:
h2oconn = H2OConnection.open(url=url, ip=ip, port=port, name=name, https=https,
verify_ssl_certificates=verify_ssl_certificates, cacert=cacert,
auth=auth, proxy=proxy,cookies=cookies, verbose=True,
_msgs=("Checking whether there is an H2O instance running at {url} ",
"connected.", "not found."))
except H2OConnectionError:
# Backward compatibility: in init() port parameter really meant "baseport" when starting a local server...
if port and not str(port).endswith("+") and not kwargs.get("as_port", False):
port = str(port) + "+"
if not start_h2o: raise
if ip and not (ip == "localhost" or ip == "127.0.0.1"):
raise H2OConnectionError('Can only start H2O launcher if IP address is localhost.')
hs = H2OLocalServer.start(nthreads=nthreads, enable_assertions=enable_assertions, max_mem_size=mmax,
min_mem_size=mmin, ice_root=ice_root, log_dir=log_dir, log_level=log_level,
port=port, name=name,
extra_classpath=extra_classpath, jvm_custom_args=jvm_custom_args,
bind_to_localhost=bind_to_localhost)
h2oconn = H2OConnection.open(server=hs, https=https, verify_ssl_certificates=verify_ssl_certificates,
cacert=cacert, auth=auth, proxy=proxy,cookies=cookies, verbose=True)
if check_version:
version_check()
h2oconn.cluster.timezone = "UTC"
h2oconn.cluster.show_status()
[docs]def lazy_import(path, pattern=None):
"""
Import a single file or collection of files.
:param path: A path to a data file (remote or local).
:param pattern: Character string containing a regular expression to match file(s) in the folder.
:returns: either a :class:`H2OFrame` with the content of the provided file, or a list of such frames if
importing multiple files.
"""
assert_is_type(path, str, [str])
assert_is_type(pattern, str, None)
paths = [path] if is_type(path, str) else path
return _import_multi(paths, pattern)
def _import_multi(paths, pattern):
assert_is_type(paths, [str])
assert_is_type(pattern, str, None)
j = api("POST /3/ImportFilesMulti", {"paths": paths, "pattern": pattern})
if j["fails"]: raise ValueError("ImportFiles of '" + ".".join(paths) + "' failed on " + str(j["fails"]))
return j["destination_frames"]
[docs]def upload_file(path, destination_frame=None, header=0, sep=None, col_names=None, col_types=None,
na_strings=None, skipped_columns=None):
"""
Upload a dataset from the provided local path to the H2O cluster.
Does a single-threaded push to H2O. Also see :meth:`import_file`.
:param path: A path specifying the location of the data to upload.
:param destination_frame: The unique hex key assigned to the imported file. If none is given, a key will
be automatically generated.
:param header: -1 means the first line is data, 0 means guess, 1 means first line is header.
:param sep: The field separator character. Values on each line of the file are separated by
this character. If not provided, the parser will automatically detect the separator.
:param col_names: A list of column names for the file.
:param col_types: A list of types or a dictionary of column names to types to specify whether columns
should be forced to a certain type upon import parsing. If a list, the types for elements that are
one will be guessed. The possible types a column may have are:
- "unknown" - this will force the column to be parsed as all NA
- "uuid" - the values in the column must be true UUID or will be parsed as NA
- "string" - force the column to be parsed as a string
- "numeric" - force the column to be parsed as numeric. H2O will handle the compression of the numeric
data in the optimal manner.
- "enum" - force the column to be parsed as a categorical column.
- "time" - force the column to be parsed as a time column. H2O will attempt to parse the following
list of date time formats: (date) "yyyy-MM-dd", "yyyy MM dd", "dd-MMM-yy", "dd MMM yy", (time)
"HH:mm:ss", "HH:mm:ss:SSS", "HH:mm:ss:SSSnnnnnn", "HH.mm.ss" "HH.mm.ss.SSS", "HH.mm.ss.SSSnnnnnn".
Times can also contain "AM" or "PM".
:param na_strings: A list of strings, or a list of lists of strings (one list per column), or a dictionary
of column names to strings which are to be interpreted as missing values.
:param skipped_columns: an integer lists of column indices to skip and not parsed into the final frame from the import file.
:returns: a new :class:`H2OFrame` instance.
:examples:
>>> frame = h2o.upload_file("/path/to/local/data")
"""
coltype = U(None, "unknown", "uuid", "string", "float", "real", "double", "int", "numeric",
"categorical", "factor", "enum", "time")
natype = U(str, [str])
assert_is_type(path, str)
assert_is_type(destination_frame, str, None)
assert_is_type(header, -1, 0, 1)
assert_is_type(sep, None, I(str, lambda s: len(s) == 1))
assert_is_type(col_names, [str], None)
assert_is_type(col_types, [coltype], {str: coltype}, None)
assert_is_type(na_strings, [natype], {str: natype}, None)
assert (skipped_columns==None) or isinstance(skipped_columns, list), \
"The skipped_columns should be an list of column names!"
check_frame_id(destination_frame)
if path.startswith("~"):
path = os.path.expanduser(path)
return H2OFrame()._upload_parse(path, destination_frame, header, sep, col_names, col_types, na_strings, skipped_columns)
[docs]def import_file(path=None, destination_frame=None, parse=True, header=0, sep=None, col_names=None, col_types=None,
na_strings=None, pattern=None, skipped_columns=None, custom_non_data_line_markers = None):
"""
Import a dataset that is already on the cluster.
The path to the data must be a valid path for each node in the H2O cluster. If some node in the H2O cluster
cannot see the file, then an exception will be thrown by the H2O cluster. Does a parallel/distributed
multi-threaded pull of the data. The main difference between this method and :func:`upload_file` is that
the latter works with local files, whereas this method imports remote files (i.e. files local to the server).
If you running H2O server on your own machine, then both methods behave the same.
:param path: path(s) specifying the location of the data to import or a path to a directory of files to import
:param destination_frame: The unique hex key assigned to the imported file. If none is given, a key will be
automatically generated.
:param parse: If True, the file should be parsed after import. If False, then a list is returned containing the file path.
:param header: -1 means the first line is data, 0 means guess, 1 means first line is header.
:param sep: The field separator character. Values on each line of the file are separated by
this character. If not provided, the parser will automatically detect the separator.
:param col_names: A list of column names for the file.
:param col_types: A list of types or a dictionary of column names to types to specify whether columns
should be forced to a certain type upon import parsing. If a list, the types for elements that are
one will be guessed. The possible types a column may have are:
- "unknown" - this will force the column to be parsed as all NA
- "uuid" - the values in the column must be true UUID or will be parsed as NA
- "string" - force the column to be parsed as a string
- "numeric" - force the column to be parsed as numeric. H2O will handle the compression of the numeric
data in the optimal manner.
- "enum" - force the column to be parsed as a categorical column.
- "time" - force the column to be parsed as a time column. H2O will attempt to parse the following
list of date time formats: (date) "yyyy-MM-dd", "yyyy MM dd", "dd-MMM-yy", "dd MMM yy", (time)
"HH:mm:ss", "HH:mm:ss:SSS", "HH:mm:ss:SSSnnnnnn", "HH.mm.ss" "HH.mm.ss.SSS", "HH.mm.ss.SSSnnnnnn".
Times can also contain "AM" or "PM".
:param na_strings: A list of strings, or a list of lists of strings (one list per column), or a dictionary
of column names to strings which are to be interpreted as missing values.
:param pattern: Character string containing a regular expression to match file(s) in the folder if `path` is a
directory.
:param skipped_columns: an integer list of column indices to skip and not parsed into the final frame from the import file.
:param custom_non_data_line_markers: If a line in imported file starts with any character in given string it will NOT be imported. Empty string means all lines are imported, None means that default behaviour for given format will be used
:returns: a new :class:`H2OFrame` instance.
:examples:
>>> # Single file import
>>> iris = import_file("h2o-3/smalldata/iris.csv")
>>> # Return all files in the folder iris/ matching the regex r"iris_.*\.csv"
>>> iris_pattern = h2o.import_file(path = "h2o-3/smalldata/iris",
... pattern = "iris_.*\.csv")
"""
coltype = U(None, "unknown", "uuid", "string", "float", "real", "double", "int", "numeric",
"categorical", "factor", "enum", "time")
natype = U(str, [str])
assert_is_type(path, str, [str])
assert_is_type(pattern, str, None)
assert_is_type(destination_frame, str, None)
assert_is_type(parse, bool)
assert_is_type(header, -1, 0, 1)
assert_is_type(sep, None, I(str, lambda s: len(s) == 1))
assert_is_type(col_names, [str], None)
assert_is_type(col_types, [coltype], {str: coltype}, None)
assert_is_type(na_strings, [natype], {str: natype}, None)
assert isinstance(skipped_columns, (type(None), list)), "The skipped_columns should be an list of column names!"
check_frame_id(destination_frame)
patharr = path if isinstance(path, list) else [path]
if any(os.path.split(p)[0] == "~" for p in patharr):
raise H2OValueError("Paths relative to a current user (~) are not valid in the server environment. "
"Please use absolute paths if possible.")
if not parse:
return lazy_import(path, pattern)
else:
return H2OFrame()._import_parse(path, pattern, destination_frame, header, sep, col_names, col_types, na_strings,
skipped_columns, custom_non_data_line_markers)
[docs]def import_hive_table(database=None, table=None, partitions=None, allow_multi_format=False):
"""
Import Hive table to H2OFrame in memory.
Make sure to start H2O with Hive on classpath. Uses hive-site.xml on classpath to connect to Hive.
:param database: Name of Hive database (default database will be used by default)
:param table: name of Hive table to import
:param partitions: a list of lists of strings - partition key column values of partitions you want to import.
:param allow_multi_format: enable import of partitioned tables with different storage formats used. WARNING:
this may fail on out-of-memory for tables with a large number of small partitions.
:returns: an :class:`H2OFrame` containing data of the specified Hive table.
:examples:
>>> my_citibike_data = h2o.import_hive_table("default", "table", [["2017", "01"], ["2017", "02"]])
"""
assert_is_type(database, str, None)
assert_is_type(table, str)
assert_is_type(partitions, [[str]], None)
p = { "database": database, "table": table, "partitions": partitions, "allow_multi_format": allow_multi_format }
j = H2OJob(api("POST /3/ImportHiveTable", data=p), "Import Hive Table").poll()
return get_frame(j.dest_key)
[docs]def import_sql_table(connection_url, table, username, password, columns=None, optimize=True, fetch_mode=None):
"""
Import SQL table to H2OFrame in memory.
Assumes that the SQL table is not being updated and is stable.
Runs multiple SELECT SQL queries concurrently for parallel ingestion.
Be sure to start the h2o.jar in the terminal with your downloaded JDBC driver in the classpath::
java -cp <path_to_h2o_jar>:<path_to_jdbc_driver_jar> water.H2OApp
Also see :func:`import_sql_select`.
Currently supported SQL databases are MySQL, PostgreSQL, MariaDB, Hive, Oracle and Microsoft SQL.
:param connection_url: URL of the SQL database connection as specified by the Java Database Connectivity (JDBC)
Driver. For example, "jdbc:mysql://localhost:3306/menagerie?&useSSL=false"
:param table: name of SQL table
:param columns: a list of column names to import from SQL table. Default is to import all columns.
:param username: username for SQL server
:param password: password for SQL server
:param optimize: DEPRECATED. Ignored - use fetch_mode instead. Optimize import of SQL table for faster imports.
:param fetch_mode: Set to DISTRIBUTED to enable distributed import. Set to SINGLE to force a sequential read by a single node
from the database.
:returns: an :class:`H2OFrame` containing data of the specified SQL table.
:examples:
>>> conn_url = "jdbc:mysql://172.16.2.178:3306/ingestSQL?&useSSL=false"
>>> table = "citibike20k"
>>> username = "root"
>>> password = "abc123"
>>> my_citibike_data = h2o.import_sql_table(conn_url, table, username, password)
"""
assert_is_type(connection_url, str)
assert_is_type(table, str)
assert_is_type(username, str)
assert_is_type(password, str)
assert_is_type(columns, [str], None)
assert_is_type(optimize, bool)
assert_is_type(fetch_mode, str, None)
p = {"connection_url": connection_url, "table": table, "username": username, "password": password,
"fetch_mode": fetch_mode}
if columns:
p["columns"] = ", ".join(columns)
j = H2OJob(api("POST /99/ImportSQLTable", data=p), "Import SQL Table").poll()
return get_frame(j.dest_key)
[docs]def import_sql_select(connection_url, select_query, username, password, optimize=True,
use_temp_table=None, temp_table_name=None, fetch_mode=None):
"""
Import the SQL table that is the result of the specified SQL query to H2OFrame in memory.
Creates a temporary SQL table from the specified sql_query.
Runs multiple SELECT SQL queries on the temporary table concurrently for parallel ingestion, then drops the table.
Be sure to start the h2o.jar in the terminal with your downloaded JDBC driver in the classpath::
java -cp <path_to_h2o_jar>:<path_to_jdbc_driver_jar> water.H2OApp
Also see h2o.import_sql_table. Currently supported SQL databases are MySQL, PostgreSQL, MariaDB, Hive, Oracle
and Microsoft SQL Server.
:param connection_url: URL of the SQL database connection as specified by the Java Database Connectivity (JDBC)
Driver. For example, "jdbc:mysql://localhost:3306/menagerie?&useSSL=false"
:param select_query: SQL query starting with `SELECT` that returns rows from one or more database tables.
:param username: username for SQL server
:param password: password for SQL server
:param optimize: DEPRECATED. Ignored - use fetch_mode instead. Optimize import of SQL table for faster imports.
:param use_temp_table: whether a temporary table should be created from select_query
:param temp_table_name: name of temporary table to be created from select_query
:param fetch_mode: Set to DISTRIBUTED to enable distributed import. Set to SINGLE to force a sequential read by a single node
from the database.
:returns: an :class:`H2OFrame` containing data of the specified SQL query.
:examples:
>>> conn_url = "jdbc:mysql://172.16.2.178:3306/ingestSQL?&useSSL=false"
>>> select_query = "SELECT bikeid from citibike20k"
>>> username = "root"
>>> password = "abc123"
>>> my_citibike_data = h2o.import_sql_select(conn_url, select_query,
... username, password, fetch_mode)
"""
assert_is_type(connection_url, str)
assert_is_type(select_query, str)
assert_is_type(username, str)
assert_is_type(password, str)
assert_is_type(optimize, bool)
assert_is_type(use_temp_table, bool, None)
assert_is_type(temp_table_name, str, None)
assert_is_type(fetch_mode, str, None)
p = {"connection_url": connection_url, "select_query": select_query, "username": username, "password": password,
"use_temp_table": use_temp_table, "temp_table_name": temp_table_name, "fetch_mode": fetch_mode}
j = H2OJob(api("POST /99/ImportSQLTable", data=p), "Import SQL Table").poll()
return get_frame(j.dest_key)
[docs]def parse_setup(raw_frames, destination_frame=None, header=0, separator=None, column_names=None,
column_types=None, na_strings=None, skipped_columns=None, custom_non_data_line_markers=None):
"""
Retrieve H2O's best guess as to what the structure of the data file is.
During parse setup, the H2O cluster will make several guesses about the attributes of
the data. This method allows a user to perform corrective measures by updating the
returning dictionary from this method. This dictionary is then fed into `parse_raw` to
produce the H2OFrame instance.
:param raw_frames: a collection of imported file frames
:param destination_frame: The unique hex key assigned to the imported file. If none is given, a key will
automatically be generated.
:param header: -1 means the first line is data, 0 means guess, 1 means first line is header.
:param separator: The field separator character. Values on each line of the file are separated by
this character. If not provided, the parser will automatically detect the separator.
:param column_names: A list of column names for the file. If skipped_columns are specified, only list column names
of columns that are not skipped.
:param column_types: A list of types or a dictionary of column names to types to specify whether columns
should be forced to a certain type upon import parsing. If a list, the types for elements that are
one will be guessed. If skipped_columns are specified, only list column types of columns that are not skipped.
The possible types a column may have are:
- "unknown" - this will force the column to be parsed as all NA
- "uuid" - the values in the column must be true UUID or will be parsed as NA
- "string" - force the column to be parsed as a string
- "numeric" - force the column to be parsed as numeric. H2O will handle the compression of the numeric
data in the optimal manner.
- "enum" - force the column to be parsed as a categorical column.
- "time" - force the column to be parsed as a time column. H2O will attempt to parse the following
list of date time formats: (date) "yyyy-MM-dd", "yyyy MM dd", "dd-MMM-yy", "dd MMM yy", (time)
"HH:mm:ss", "HH:mm:ss:SSS", "HH:mm:ss:SSSnnnnnn", "HH.mm.ss" "HH.mm.ss.SSS", "HH.mm.ss.SSSnnnnnn".
Times can also contain "AM" or "PM".
:param na_strings: A list of strings, or a list of lists of strings (one list per column), or a dictionary
of column names to strings which are to be interpreted as missing values.
:param skipped_columns: an integer lists of column indices to skip and not parsed into the final frame from the import file.
:param custom_non_data_line_markers: If a line in imported file starts with any character in given string it will NOT be imported. Empty string means all lines are imported, None means that default behaviour for given format will be used
:returns: a dictionary containing parse parameters guessed by the H2O backend.
"""
coltype = U(None, "unknown", "uuid", "string", "float", "real", "double", "int", "numeric",
"categorical", "factor", "enum", "time")
natype = U(str, [str])
assert_is_type(raw_frames, str, [str])
assert_is_type(destination_frame, None, str)
assert_is_type(header, -1, 0, 1)
assert_is_type(separator, None, I(str, lambda s: len(s) == 1))
assert_is_type(column_names, [str], None)
assert_is_type(column_types, [coltype], {str: coltype}, None)
assert_is_type(na_strings, [natype], {str: natype}, None)
check_frame_id(destination_frame)
# The H2O backend only accepts things that are quoted
if is_type(raw_frames, str): raw_frames = [raw_frames]
# temporary dictionary just to pass the following information to the parser: header, separator
kwargs = {"check_header": header, "source_frames": [quoted(frame_id) for frame_id in raw_frames]}
if separator:
kwargs["separator"] = ord(separator)
if custom_non_data_line_markers is not None:
kwargs["custom_non_data_line_markers"] = custom_non_data_line_markers;
j = api("POST /3/ParseSetup", data=kwargs)
if "warnings" in j and j["warnings"]:
for w in j["warnings"]:
warnings.warn(w)
# TODO: really should be url encoding...
if destination_frame:
j["destination_frame"] = destination_frame
parse_column_len = len(j["column_types"]) if skipped_columns is None else (len(j["column_types"])-len(skipped_columns))
tempColumnNames = j["column_names"] if j["column_names"] is not None else gen_header(j["number_columns"])
useType = [True]*len(tempColumnNames)
if skipped_columns is not None:
useType = [True]*len(tempColumnNames)
for ind in range(len(tempColumnNames)):
if ind in skipped_columns:
useType[ind]=False
if column_names is not None:
if not isinstance(column_names, list): raise ValueError("col_names should be a list")
if (skipped_columns is not None) and len(skipped_columns)>0:
if (len(column_names)) != parse_column_len:
raise ValueError(
"length of col_names should be equal to the number of columns parsed: %d vs %d"
% (len(column_names), parse_column_len))
else:
if len(column_names) != len(j["column_types"]): raise ValueError(
"length of col_names should be equal to the number of columns: %d vs %d"
% (len(column_names), len(j["column_types"])))
j["column_names"] = column_names
counter = 0
for ind in range(len(tempColumnNames)):
if useType[ind]:
tempColumnNames[ind]=column_names[counter]
counter=counter+1
if (column_types is not None): # keep the column types to include all columns
if isinstance(column_types, dict):
# overwrite dictionary to ordered list of column types. if user didn't specify column type for all names,
# use type provided by backend
if j["column_names"] is None: # no colnames discovered! (C1, C2, ...)
j["column_names"] = gen_header(j["number_columns"])
if not set(column_types.keys()).issubset(set(j["column_names"])): raise ValueError(
"names specified in col_types is not a subset of the column names")
idx = 0
column_types_list = []
for name in tempColumnNames: # column_names may have already been changed
if name in column_types:
column_types_list.append(column_types[name])
else:
column_types_list.append(j["column_types"][idx])
idx += 1
column_types = column_types_list
elif isinstance(column_types, list):
if len(column_types) != parse_column_len: raise ValueError(
"length of col_types should be equal to the number of parsed columns")
# need to expand it out to all columns, not just the parsed ones
column_types_list = j["column_types"]
counter = 0
for ind in range(len(j["column_types"])):
if useType[ind] and (column_types[counter]!=None):
column_types_list[ind]=column_types[counter]
counter=counter+1
column_types = column_types_list
else: # not dictionary or list
raise ValueError("col_types should be a list of types or a dictionary of column names to types")
j["column_types"] = column_types
if na_strings is not None:
if isinstance(na_strings, dict):
# overwrite dictionary to ordered list of lists of na_strings
if not j["column_names"]: raise ValueError("column names should be specified")
if not set(na_strings.keys()).issubset(set(j["column_names"])): raise ValueError(
"names specified in na_strings is not a subset of the column names")
j["na_strings"] = [[] for _ in range(len(j["column_names"]))]
for name, na in na_strings.items():
idx = j["column_names"].index(name)
if is_type(na, str): na = [na]
for n in na: j["na_strings"][idx].append(quoted(n))
elif is_type(na_strings, [[str]]):
if len(na_strings) != len(j["column_types"]):
raise ValueError("length of na_strings should be equal to the number of columns")
j["na_strings"] = [[quoted(na) for na in col] if col is not None else [] for col in na_strings]
elif isinstance(na_strings, list):
j["na_strings"] = [[quoted(na) for na in na_strings]] * len(j["column_types"])
else: # not a dictionary or list
raise ValueError(
"na_strings should be a list, a list of lists (one list per column), or a dictionary of column "
"names to strings which are to be interpreted as missing values")
if skipped_columns is not None:
if isinstance(skipped_columns, list):
j["skipped_columns"] = []
for colidx in skipped_columns:
if (colidx < 0): raise ValueError("skipped column index cannot be negative")
j["skipped_columns"].append(colidx)
# quote column names and column types also when not specified by user
if j["column_names"]: j["column_names"] = list(map(quoted, j["column_names"]))
j["column_types"] = list(map(quoted, j["column_types"]))
return j
[docs]def parse_raw(setup, id=None, first_line_is_header=0):
"""
Parse dataset using the parse setup structure.
:param setup: Result of ``h2o.parse_setup()``
:param id: an id for the frame.
:param first_line_is_header: -1, 0, 1 if the first line is to be used as the header
:returns: an :class:`H2OFrame` object.
"""
assert_is_type(setup, dict)
assert_is_type(id, str, None)
assert_is_type(first_line_is_header, -1, 0, 1)
check_frame_id(id)
if id:
setup["destination_frame"] = id
if first_line_is_header != (-1, 0, 1):
if first_line_is_header not in (-1, 0, 1): raise ValueError("first_line_is_header should be -1, 0, or 1")
setup["check_header"] = first_line_is_header
fr = H2OFrame()
fr._parse_raw(setup)
return fr
[docs]def assign(data, xid):
"""
(internal) Assign new id to the frame.
:param data: an H2OFrame whose id should be changed
:param xid: new id for the frame.
:returns: the passed frame.
"""
assert_is_type(data, H2OFrame)
assert_is_type(xid, str)
assert_satisfies(xid, xid != data.frame_id)
check_frame_id(xid)
data._ex = ExprNode("assign", xid, data)._eval_driver(False)
data._ex._cache._id = xid
data._ex._children = None
return data
[docs]def deep_copy(data, xid):
"""
Create a deep clone of the frame ``data``.
:param data: an H2OFrame to be cloned
:param xid: (internal) id to be assigned to the new frame.
:returns: new :class:`H2OFrame` which is the clone of the passed frame.
"""
assert_is_type(data, H2OFrame)
assert_is_type(xid, str)
assert_satisfies(xid, xid != data.frame_id)
check_frame_id(xid)
duplicate = data.apply(lambda x: x)
duplicate._ex = ExprNode("assign", xid, duplicate)._eval_driver(False)
duplicate._ex._cache._id = xid
duplicate._ex._children = None
return duplicate
[docs]def get_model(model_id):
"""
Load a model from the server.
:param model_id: The model identification in H2O
:returns: Model object, a subclass of H2OEstimator
"""
assert_is_type(model_id, str)
model_json = api("GET /3/Models/%s" % model_id)["models"][0]
algo = model_json["algo"]
if algo == "svd": m = H2OSVD()
elif algo == "pca": m = H2OPrincipalComponentAnalysisEstimator()
elif algo == "drf": m = H2ORandomForestEstimator()
elif algo == "naivebayes": m = H2ONaiveBayesEstimator()
elif algo == "kmeans": m = H2OKMeansEstimator()
elif algo == "glrm": m = H2OGeneralizedLowRankEstimator()
elif algo == "glm": m = H2OGeneralizedLinearEstimator()
elif algo == "gbm": m = H2OGradientBoostingEstimator()
elif algo == "deepwater": m = H2ODeepWaterEstimator()
elif algo == "xgboost": m = H2OXGBoostEstimator()
elif algo == "word2vec": m = H2OWord2vecEstimator()
elif algo == "generic": m = H2OGenericEstimator()
elif algo == "deeplearning":
if model_json["output"]["model_category"] == "AutoEncoder":
m = H2OAutoEncoderEstimator()
else:
m = H2ODeepLearningEstimator()
elif algo == "stackedensemble": m = H2OStackedEnsembleEstimator()
elif algo == "isolationforest": m = H2OIsolationForestEstimator()
else:
raise ValueError("Unknown algo type: " + algo)
m._resolve_model(model_id, model_json)
return m
[docs]def get_grid(grid_id):
"""
Return the specified grid.
:param grid_id: The grid identification in h2o
:returns: an :class:`H2OGridSearch` instance.
"""
assert_is_type(grid_id, str)
grid_json = api("GET /99/Grids/%s" % grid_id)
models = [get_model(key["name"]) for key in grid_json["model_ids"]]
# get first model returned in list of models from grid search to get model class (binomial, multinomial, etc)
first_model_json = api("GET /3/Models/%s" % grid_json["model_ids"][0]["name"])["models"][0]
gs = H2OGridSearch(None, {}, grid_id)
gs._resolve_grid(grid_id, grid_json, first_model_json)
gs.models = models
hyper_params = {param: set() for param in gs.hyper_names}
for param in gs.hyper_names:
for model in models:
if isinstance(model.full_parameters[param]["actual_value"], list):
hyper_params[param].add(model.full_parameters[param]["actual_value"][0])
else:
hyper_params[param].add(model.full_parameters[param]["actual_value"])
hyper_params = {str(param): list(vals) for param, vals in hyper_params.items()}
gs.hyper_params = hyper_params
gs.model = model.__class__()
return gs
[docs]def get_frame(frame_id, **kwargs):
"""
Obtain a handle to the frame in H2O with the frame_id key.
:param str frame_id: id of the frame to retrieve.
:returns: an :class:`H2OFrame` object
"""
assert_is_type(frame_id, str)
return H2OFrame.get_frame(frame_id, **kwargs)
[docs]def no_progress():
"""
Disable the progress bar from flushing to stdout.
The completed progress bar is printed when a job is complete so as to demarcate a log file.
"""
H2OJob.__PROGRESS_BAR__ = False
[docs]def show_progress():
"""Enable the progress bar (it is enabled by default)."""
H2OJob.__PROGRESS_BAR__ = True
[docs]def enable_expr_optimizations(flag):
"""Enable expression tree local optimizations."""
ExprNode.__ENABLE_EXPR_OPTIMIZATIONS__ = flag
def is_expr_optimizations_enabled():
return ExprNode.__ENABLE_EXPR_OPTIMIZATIONS__
[docs]def log_and_echo(message=""):
"""
Log a message on the server-side logs.
This is helpful when running several pieces of work one after the other on a single H2O
cluster and you want to make a notation in the H2O server side log where one piece of
work ends and the next piece of work begins.
Sends a message to H2O for logging. Generally used for debugging purposes.
:param message: message to write to the log.
"""
assert_is_type(message, str)
api("POST /3/LogAndEcho", data={"message": str(message)})
[docs]def remove(x, cascade=True):
"""
Remove object(s) from H2O.
:param x: H2OFrame, H2OEstimator, or string, or a list of those things: the object(s) or unique id(s)
pointing to the object(s) to be removed.
:param cascade: boolean, if set to TRUE (default), the object dependencies (e.g. submodels) are also removed.
"""
item_type = U(str, Keyed)
assert_is_type(x, item_type, [item_type])
if not isinstance(x, list): x = [x]
for xi in x:
if isinstance(xi, H2OFrame):
if xi.key is None: return # Lazy frame, never evaluated, nothing in cluster
rapids("(rm {})".format(xi.key))
xi.detach()
elif isinstance(xi, Keyed):
api("DELETE /3/DKV/%s" % xi.key, data=dict(cascade=cascade))
xi.detach()
else:
# string may be a Frame key name part of a rapids session... need to call rm thru rapids here
try:
rapids("(rm {})".format(xi))
except:
api("DELETE /3/DKV/%s" % xi, data=dict(cascade=cascade))
[docs]def remove_all(retained=None):
"""
Removes all objects from H2O with possibility to specify models and frames to retain.
Retained keys must be keys of models and frames only. For models retained, training and validation frames are retained as well.
Cross validation models of a retained model are NOT retained automatically, those must be specified explicitely.
:param retained: Keys of models and frames to retain
"""
params = {"retained_keys": retained}
api(endpoint="DELETE /3/DKV", data=params)
[docs]def rapids(expr):
"""
Execute a Rapids expression.
:param expr: The rapids expression (ascii string).
:returns: The JSON response (as a python dictionary) of the Rapids execution.
"""
assert_is_type(expr, str)
return ExprNode.rapids(expr)
[docs]def ls():
"""List keys on an H2O Cluster."""
return H2OFrame._expr(expr=ExprNode("ls")).as_data_frame(use_pandas=True)
[docs]def frame(frame_id):
"""
Retrieve metadata for an id that points to a Frame.
:param frame_id: the key of a Frame in H2O.
:returns: dict containing the frame meta-information.
"""
assert_is_type(frame_id, str)
return api("GET /3/Frames/%s" % frame_id)
[docs]def frames():
"""
Retrieve all the Frames.
:returns: Meta information on the frames
"""
return api("GET /3/Frames")
[docs]def download_pojo(model, path="", get_jar=True, jar_name=""):
"""
Download the POJO for this model to the directory specified by path; if path is "", then dump to screen.
:param model: the model whose scoring POJO should be retrieved.
:param path: an absolute path to the directory where POJO should be saved.
:param get_jar: retrieve the h2o-genmodel.jar also (will be saved to the same folder ``path``).
:param jar_name: Custom name of genmodel jar.
:returns: location of the downloaded POJO file.
"""
assert_is_type(model, ModelBase)
assert_is_type(path, str)
assert_is_type(get_jar, bool)
if not model.have_pojo:
raise H2OValueError("Export to POJO not supported")
if path == "":
java_code = api("GET /3/Models.java/%s" % model.model_id)
print(java_code)
return None
else:
filename = api("GET /3/Models.java/%s" % model.model_id, save_to=path)
if get_jar:
if jar_name == "":
api("GET /3/h2o-genmodel.jar", save_to=os.path.join(path, "h2o-genmodel.jar"))
else:
api("GET /3/h2o-genmodel.jar", save_to=os.path.join(path, jar_name))
return filename
[docs]def download_csv(data, filename):
"""
Download an H2O data set to a CSV file on the local disk.
Warning: Files located on the H2O server may be very large! Make sure you have enough
hard drive space to accommodate the entire file.
:param data: an H2OFrame object to be downloaded.
:param filename: name for the CSV file where the data should be saved to.
"""
assert_is_type(data, H2OFrame)
assert_is_type(filename, str)
return api("GET /3/DownloadDataset?frame_id=%s&hex_string=false" % data.frame_id, save_to=filename)
[docs]def download_all_logs(dirname=".", filename=None, container=None):
"""
Download H2O log files to disk.
:param dirname: a character string indicating the directory that the log file should be saved in.
:param filename: a string indicating the name that the CSV file should be.
Note that the default container format is .zip, so the file name must include the .zip extension.
:param container: a string indicating how to archive the logs, choice of "ZIP" (default) and "LOG"
ZIP: individual log files archived in a ZIP package
LOG: all log files will be concatenated together in one text file
:returns: path of logs written in a zip file.
:examples: The following code will save the zip file `'h2o_log.zip'` in a directory that is one down from where you are currently working into a directory called `your_directory_name`. (Please note that `your_directory_name` should be replaced with the name of the directory that you've created and that already exists.)
>>> h2o.download_all_logs(dirname='./your_directory_name/', filename = 'h2o_log.zip')
"""
assert_is_type(dirname, str)
assert_is_type(filename, str, None)
assert_is_type(container, "ZIP", "LOG", None)
type = "/%s" % container if container else ""
def save_to(resp):
path = os.path.join(dirname, filename if filename else h2oconn.save_to_detect(resp))
print("Writing H2O logs to " + path)
return path
return api("GET /3/Logs/download%s" % type, save_to=save_to)
[docs]def save_model(model, path="", force=False):
"""
Save an H2O Model object to disk. (Note that ensemble binary models can now be saved using this method.)
:param model: The model object to save.
:param path: a path to save the model at (hdfs, s3, local)
:param force: if True overwrite destination directory in case it exists, or throw exception if set to False.
:returns: the path of the saved model
:examples:
>>> path = h2o.save_model(my_model, dir=my_path)
"""
assert_is_type(model, ModelBase)
assert_is_type(path, str)
assert_is_type(force, bool)
path = os.path.join(os.getcwd() if path == "" else path, model.model_id)
return api("GET /99/Models.bin/%s" % model.model_id, data={"dir": path, "force": force})["dir"]
[docs]def load_model(path):
"""
Load a saved H2O model from disk. (Note that ensemble binary models can now be loaded using this method.)
:param path: the full path of the H2O Model to be imported.
:returns: an :class:`H2OEstimator` object
:examples:
>>> path = h2o.save_model(my_model, dir=my_path)
>>> h2o.load_model(path)
"""
assert_is_type(path, str)
res = api("POST /99/Models.bin/%s" % "", data={"dir": path})
return get_model(res["models"][0]["model_id"]["name"])
[docs]def export_file(frame, path, force=False, sep=",", compression=None, parts=1):
"""
Export a given H2OFrame to a path on the machine this python session is currently connected to.
:param frame: the Frame to save to disk.
:param path: the path to the save point on disk.
:param force: if True, overwrite any preexisting file with the same path
:param compression: how to compress the exported dataset (default none; gzip, bzip2 and snappy available)
:param parts: enables export to multiple 'part' files instead of just a single file.
Convenient for large datasets that take too long to store in a single file.
Use parts=-1 to instruct H2O to determine the optimal number of part files or
specify your desired maximum number of part files. Path needs to be a directory
when exporting to multiple files, also that directory must be empty.
Default is ``parts = 1``, which is to export to a single file.
"""
assert_is_type(frame, H2OFrame)
assert_is_type(path, str)
assert_is_type(sep, I(str, lambda s: len(s) == 1))
assert_is_type(force, bool)
assert_is_type(parts, int)
assert_is_type(compression, str, None)
H2OJob(api("POST /3/Frames/%s/export" % (frame.frame_id),
data={"path": path, "num_parts": parts, "force": force, "compression": compression, "separator": ord(sep)}),
"Export File").poll()
[docs]def cluster():
"""Return :class:`H2OCluster` object describing the backend H2O cluster."""
return h2oconn.cluster if h2oconn else None
[docs]def create_frame(frame_id=None, rows=10000, cols=10, randomize=True,
real_fraction=None, categorical_fraction=None, integer_fraction=None,
binary_fraction=None, time_fraction=None, string_fraction=None,
value=0, real_range=100, factors=100, integer_range=100,
binary_ones_fraction=0.02, missing_fraction=0.01,
has_response=False, response_factors=2, positive_response=False,
seed=None, seed_for_column_types=None):
"""
Create a new frame with random data.
Creates a data frame in H2O with real-valued, categorical, integer, and binary columns specified by the user.
:param frame_id: the destination key. If empty, this will be auto-generated.
:param rows: the number of rows of data to generate.
:param cols: the number of columns of data to generate. Excludes the response column if has_response is True.
:param randomize: If True, data values will be randomly generated. This must be True if either
categorical_fraction or integer_fraction is non-zero.
:param value: if randomize is False, then all real-valued entries will be set to this value.
:param real_range: the range of randomly generated real values.
:param real_fraction: the fraction of columns that are real-valued.
:param categorical_fraction: the fraction of total columns that are categorical.
:param factors: the number of (unique) factor levels in each categorical column.
:param integer_fraction: the fraction of total columns that are integer-valued.
:param integer_range: the range of randomly generated integer values.
:param binary_fraction: the fraction of total columns that are binary-valued.
:param binary_ones_fraction: the fraction of values in a binary column that are set to 1.
:param time_fraction: the fraction of randomly created date/time columns.
:param string_fraction: the fraction of randomly created string columns.
:param missing_fraction: the fraction of total entries in the data frame that are set to NA.
:param has_response: A logical value indicating whether an additional response column should be prepended to the
final H2O data frame. If set to True, the total number of columns will be ``cols + 1``.
:param response_factors: if has_response is True, then this variable controls the type of the "response" column:
setting response_factors to 1 will generate real-valued response, any value greater or equal than 2 will
create categorical response with that many categories.
:param positive_reponse: when response variable is present and of real type, this will control whether it
contains positive values only, or both positive and negative.
:param seed: a seed used to generate random values when ``randomize`` is True.
:param seed_for_column_types: a seed used to generate random column types when ``randomize`` is True.
:returns: an :class:`H2OFrame` object
"""
t_fraction = U(None, BoundNumeric(0, 1))
assert_is_type(frame_id, str, None)
assert_is_type(rows, BoundInt(1))
assert_is_type(cols, BoundInt(1))
assert_is_type(randomize, bool)
assert_is_type(value, numeric)
assert_is_type(real_range, BoundNumeric(0))
assert_is_type(real_fraction, t_fraction)
assert_is_type(categorical_fraction, t_fraction)
assert_is_type(integer_fraction, t_fraction)
assert_is_type(binary_fraction, t_fraction)
assert_is_type(time_fraction, t_fraction)
assert_is_type(string_fraction, t_fraction)
assert_is_type(missing_fraction, t_fraction)
assert_is_type(binary_ones_fraction, t_fraction)
assert_is_type(factors, BoundInt(1))
assert_is_type(integer_range, BoundInt(1))
assert_is_type(has_response, bool)
assert_is_type(response_factors, None, BoundInt(1))
assert_is_type(positive_response, bool)
assert_is_type(seed, int, None)
assert_is_type(seed_for_column_types, int, None)
check_frame_id(frame_id)
if randomize and value:
raise H2OValueError("Cannot set data to a `value` if `randomize` is true")
if (categorical_fraction or integer_fraction) and not randomize:
raise H2OValueError("`randomize` should be True when either categorical or integer columns are used.")
# The total column fraction that the user has specified explicitly. This sum should not exceed 1. We will respect
# all explicitly set fractions, and will auto-select the remaining fractions.
frcs = [real_fraction, categorical_fraction, integer_fraction, binary_fraction, time_fraction, string_fraction]
wgts = [0.5, 0.2, 0.2, 0.1, 0.0, 0.0]
sum_explicit_fractions = sum(0 if f is None else f for f in frcs)
count_explicit_fractions = sum(0 if f is None else 1 for f in frcs)
remainder = 1 - sum_explicit_fractions
if sum_explicit_fractions >= 1 + 1e-10:
raise H2OValueError("Fractions of binary, integer, categorical, time and string columns should add up "
"to a number less than 1.")
elif sum_explicit_fractions >= 1 - 1e-10:
# The fractions already add up to almost 1. No need to do anything (the server will absorb the tiny
# remainder into the real_fraction column).
pass
else:
# sum_explicit_fractions < 1 => distribute the remainder among the columns that were not set explicitly
if count_explicit_fractions == 6:
raise H2OValueError("Fraction of binary, integer, categorical, time and string columns add up to a "
"number less than 1.")
# Each column type receives a certain part (proportional to column's "weight") of the remaining fraction.
sum_implicit_weights = sum(wgts[i] if frcs[i] is None else 0 for i in range(6))
for i, f in enumerate(frcs):
if frcs[i] is not None: continue
if sum_implicit_weights == 0:
frcs[i] = remainder
else:
frcs[i] = remainder * wgts[i] / sum_implicit_weights
remainder -= frcs[i]
sum_implicit_weights -= wgts[i]
for i, f in enumerate(frcs):
if f is None:
frcs[i] = 0
real_fraction, categorical_fraction, integer_fraction, binary_fraction, time_fraction, string_fraction = frcs
parms = {"dest": frame_id if frame_id else py_tmp_key(append=h2oconn.session_id),
"rows": rows,
"cols": cols,
"randomize": randomize,
"categorical_fraction": categorical_fraction,
"integer_fraction": integer_fraction,
"binary_fraction": binary_fraction,
"time_fraction": time_fraction,
"string_fraction": string_fraction,
# "real_fraction" is not provided, the backend computes it as 1 - sum(5 other fractions)
"value": value,
"real_range": real_range,
"factors": factors,
"integer_range": integer_range,
"binary_ones_fraction": binary_ones_fraction,
"missing_fraction": missing_fraction,
"has_response": has_response,
"response_factors": response_factors,
"positive_response": positive_response,
"seed": -1 if seed is None else seed,
"seed_for_column_types": -1 if seed_for_column_types is None else seed_for_column_types,
}
H2OJob(api("POST /3/CreateFrame", data=parms), "Create Frame").poll()
return get_frame(parms["dest"])
[docs]def interaction(data, factors, pairwise, max_factors, min_occurrence, destination_frame=None):
"""
Categorical Interaction Feature Creation in H2O.
Creates a frame in H2O with n-th order interaction features between categorical columns, as specified by
the user.
:param data: the H2OFrame that holds the target categorical columns.
:param factors: factor columns (either indices or column names).
:param pairwise: If True, create pairwise interactions between factors (otherwise create one
higher-order interaction). Only applicable if there are 3 or more factors.
:param max_factors: Max. number of factor levels in pair-wise interaction terms (if enforced, one extra
catch-all factor will be made).
:param min_occurrence: Min. occurrence threshold for factor levels in pair-wise interaction terms
:param destination_frame: a string indicating the destination key. If empty, this will be auto-generated by H2O.
:returns: :class:`H2OFrame`
"""
assert_is_type(data, H2OFrame)
assert_is_type(factors, [str, int])
assert_is_type(pairwise, bool)
assert_is_type(max_factors, int)
assert_is_type(min_occurrence, int)
assert_is_type(destination_frame, str, None)
factors = [data.names[n] if is_type(n, int) else n for n in factors]
parms = {"dest": py_tmp_key(append=h2oconn.session_id) if destination_frame is None else destination_frame,
"source_frame": data.frame_id,
"factor_columns": [quoted(f) for f in factors],
"pairwise": pairwise,
"max_factors": max_factors,
"min_occurrence": min_occurrence,
}
H2OJob(api("POST /3/Interaction", data=parms), "Interactions").poll()
return get_frame(parms["dest"])
[docs]def as_list(data, use_pandas=True, header=True):
"""
Convert an H2O data object into a python-specific object.
WARNING! This will pull all data local!
If Pandas is available (and use_pandas is True), then pandas will be used to parse the
data frame. Otherwise, a list-of-lists populated by character data will be returned (so
the types of data will all be str).
:param data: an H2O data object.
:param use_pandas: If True, try to use pandas for reading in the data.
:param header: If True, return column names as first element in list
:returns: List of lists (Rows x Columns).
"""
assert_is_type(data, H2OFrame)
assert_is_type(use_pandas, bool)
assert_is_type(header, bool)
return H2OFrame.as_data_frame(data, use_pandas=use_pandas, header=header)
[docs]def demo(funcname, interactive=True, echo=True, test=False):
"""
H2O built-in demo facility.
:param funcname: A string that identifies the h2o python function to demonstrate.
:param interactive: If True, the user will be prompted to continue the demonstration after every segment.
:param echo: If True, the python commands that are executed will be displayed.
:param test: If True, `h2o.init()` will not be called (used for pyunit testing).
:example:
>>> import h2o
>>> h2o.demo("gbm")
"""
import h2o.demos as h2odemo
assert_is_type(funcname, str)
assert_is_type(interactive, bool)
assert_is_type(echo, bool)
assert_is_type(test, bool)
demo_function = getattr(h2odemo, funcname, None)
if demo_function and type(demo_function) is type(demo):
demo_function(interactive, echo, test)
else:
print("Demo for %s is not available." % funcname)
[docs]def load_dataset(relative_path):
"""Imports a data file within the 'h2o_data' folder."""
assert_is_type(relative_path, str)
h2o_dir = os.path.split(__file__)[0]
for possible_file in [os.path.join(h2o_dir, relative_path),
os.path.join(h2o_dir, "h2o_data", relative_path),
os.path.join(h2o_dir, "h2o_data", relative_path + ".csv")]:
if os.path.exists(possible_file):
return upload_file(possible_file)
# File not found -- raise an error!
raise H2OValueError("Data file %s cannot be found" % relative_path)
[docs]def make_metrics(predicted, actual, domain=None, distribution=None):
"""
Create Model Metrics from predicted and actual values in H2O.
:param H2OFrame predicted: an H2OFrame containing predictions.
:param H2OFrame actuals: an H2OFrame containing actual values.
:param domain: list of response factors for classification.
:param distribution: distribution for regression.
"""
assert_is_type(predicted, H2OFrame)
assert_is_type(actual, H2OFrame)
# assert predicted.ncol == 1, "`predicted` frame should have exactly 1 column"
assert actual.ncol == 1, "`actual` frame should have exactly 1 column"
assert_is_type(distribution, str, None)
assert_satisfies(actual.ncol, actual.ncol == 1)
if domain is None and any(actual.isfactor()):
domain = actual.levels()[0]
res = api("POST /3/ModelMetrics/predictions_frame/%s/actuals_frame/%s" % (predicted.frame_id, actual.frame_id),
data={"domain": domain, "distribution": distribution})
return res["model_metrics"]
[docs]def flow():
"""
Open H2O Flow in your browser.
"""
webbrowser.open(connection().base_url, new = 1)
def _put_key(file_path, dest_key=None, overwrite=True):
"""
Upload given file into DKV and save it under give key as raw object.
:param dest_key: name of destination key in DKV
:param file_path: path to file to upload
:return: key name if object was uploaded successfully
"""
ret = api("POST /3/PutKey?destination_key={}&overwrite={}".format(dest_key if dest_key else '', overwrite),
filename=file_path)
return ret["destination_key"]
def _create_zip_file(dest_filename, *content_list):
from .utils.shared_utils import InMemoryZipArch
with InMemoryZipArch(dest_filename) as zip_arch:
for filename, file_content in content_list:
zip_arch.append(filename, file_content)
return dest_filename
def _inspect_methods_separately(obj):
import inspect
class_def = "class {}:\n".format(obj.__name__)
for name, member in inspect.getmembers(obj):
if inspect.ismethod(member):
class_def += inspect.getsource(member)
elif inspect.isfunction(member):
class_def += inspect.getsource(member)
return class_def
def _default_source_provider(obj):
import inspect
# First try to get source code via inspect
try:
return ' '.join(inspect.getsourcelines(obj)[0])
except (OSError, TypeError):
# It seems like we are in interactive shell and
# we do not have access to class source code directly
# At this point we can:
# (1) get IPython history and find class definition, or
# (2) compose body of class from methods, since it is still possible to get
# method body
return _inspect_methods_separately(obj)
def _default_custom_distribution_source_provider(obj):
from h2o.utils.distributions import CustomDistributionGeneric
if CustomDistributionGeneric in obj.mro():
return _inspect_methods_separately(obj)
else:
return _default_source_provider(obj)
[docs]def upload_custom_metric(func, func_file="metrics.py", func_name=None, class_name=None, source_provider=None):
"""
Upload given metrics function into H2O cluster.
The metrics can have different representation:
- class: needs to implement map(pred, act, weight, offset, model), reduce(l, r) and metric(l) methods
- string: the same as in class case, but the class is given as a string
:param func: metric representation: string, class
:param func_file: internal name of file to save given metrics representation
:param func_name: name for h2o key under which the given metric is saved
:param class_name: name of class wrapping the metrics function (when supplied as string)
:param source_provider: a function which provides a source code for given function
:return: reference to uploaded metrics function
:examples:
>>> class CustomMaeFunc:
>>> def map(self, pred, act, w, o, model):
>>> return [abs(act[0] - pred[0]), 1]
>>>
>>> def reduce(self, l, r):
>>> return [l[0] + r[0], l[1] + r[1]]
>>>
>>> def metric(self, l):
>>> return l[0] / l[1]
>>>
>>>
>>> h2o.upload_custom_metric(CustomMaeFunc, func_name="mae")
>>>
>>> custom_func_str = '''class CustomMaeFunc:
>>> def map(self, pred, act, w, o, model):
>>> return [abs(act[0] - pred[0]), 1]
>>>
>>> def reduce(self, l, r):
>>> return [l[0] + r[0], l[1] + r[1]]
>>>
>>> def metric(self, l):
>>> return l[0] / l[1]'''
>>>
>>>
>>> h2o.upload_custom_metric(custom_func_str, class_name="CustomMaeFunc", func_name="mae")
"""
import tempfile
import inspect
# Use default source provider
if not source_provider:
source_provider = _default_source_provider
# The template wraps given metrics representation
_CFUNC_CODE_TEMPLATE = """# Generated code
import water.udf.CMetricFunc as MetricFunc
# User given metric function as a class implementing
# 3 methods defined by interface CMetricFunc
{}
# Generated user metric which satisfies the interface
# of Java MetricFunc
class {}Wrapper({}, MetricFunc, object):
pass
"""
assert_satisfies(func, inspect.isclass(func) or isinstance(func, str),
"The argument func needs to be string or class !")
assert_satisfies(func_file, func_file is not None,
"The argument func_file is missing!")
assert_satisfies(func_file, func_file.endswith('.py'),
"The argument func_file needs to end with '.py'")
code = None
derived_func_name = None
module_name = func_file[:-3]
if isinstance(func, str):
assert_satisfies(class_name, class_name is not None,
"The argument class_name is missing! " +
"It needs to reference the class in given string!")
code = _CFUNC_CODE_TEMPLATE.format(func, class_name, class_name)
derived_func_name = "metrics_{}".format(class_name)
class_name = "{}.{}Wrapper".format(module_name, class_name)
else:
assert_satisfies(func, inspect.isclass(func), "The parameter `func` should be str or class")
for method in ['map', 'reduce', 'metric']:
assert_satisfies(func, method in func.__dict__, "The class `func` needs to define method `{}`".format(method))
assert_satisfies(class_name, class_name is None,
"If class is specified then class_name parameter needs to be None")
class_name = "{}.{}Wrapper".format(module_name, func.__name__)
derived_func_name = "metrics_{}".format(func.__name__)
code = _CFUNC_CODE_TEMPLATE.format(source_provider(func), func.__name__, func.__name__)
# If the func name is not given, use whatever we can derived from given definition
if not func_name:
func_name = derived_func_name
# Saved into jar file
tmpdir = tempfile.mkdtemp(prefix="h2o-func")
func_arch_file = _create_zip_file("{}/func.jar".format(tmpdir), (func_file, code))
# Upload into K/V
dest_key = _put_key(func_arch_file, dest_key=func_name)
# Reference
return "python:{}={}".format(dest_key, class_name)
def upload_custom_distribution(func, func_file="distributions.py", func_name=None, class_name=None, source_provider=None):
import tempfile
import inspect
# Use default source provider
if not source_provider:
source_provider = _default_custom_distribution_source_provider
# The template wraps given metrics representation
_CFUNC_CODE_TEMPLATE = """# Generated code
import water.udf.CDistributionFunc as DistributionFunc
# User given metric function as a class implementing
# 4 methods defined by interface CDistributionFunc
{}
# Generated user distribution which satisfies the interface
# of Java DistributionFunc
class {}Wrapper({}, DistributionFunc, object):
pass
"""
assert_satisfies(func, inspect.isclass(func) or isinstance(func, str),
"The argument func needs to be string or class !")
assert_satisfies(func_file, func_file is not None,
"The argument func_file is missing!")
assert_satisfies(func_file, func_file.endswith('.py'),
"The argument func_file needs to end with '.py'")
code = None
derived_func_name = None
module_name = func_file[:-3]
if isinstance(func, str):
assert_satisfies(class_name, class_name is not None,
"The argument class_name is missing! " +
"It needs to reference the class in given string!")
code = _CFUNC_CODE_TEMPLATE.format(func, class_name, class_name)
derived_func_name = "distributions_{}".format(class_name)
class_name = "{}.{}Wrapper".format(module_name, class_name)
else:
assert_satisfies(func, inspect.isclass(func), "The parameter `func` should be str or class")
for method in ['link', 'init', 'gamma', 'gradient']:
assert_satisfies(func, method in dir(func), "The class `func` needs to define method `{}`".format(method))
assert_satisfies(class_name, class_name is None,
"If class is specified then class_name parameter needs to be None")
class_name = "{}.{}Wrapper".format(module_name, func.__name__)
derived_func_name = "distributions_{}".format(func.__name__)
code = _CFUNC_CODE_TEMPLATE.format(source_provider(func), func.__name__, func.__name__)
# If the func name is not given, use whatever we can derived from given definition
if not func_name:
func_name = derived_func_name
# Saved into jar file
tmpdir = tempfile.mkdtemp(prefix="h2o-func")
func_arch_file = _create_zip_file("{}/func.jar".format(tmpdir), (func_file, code))
# Upload into K/V
dest_key = _put_key(func_arch_file, dest_key=func_name)
# Reference
return "python:{}={}".format(dest_key, class_name)
[docs]def import_mojo(mojo_path):
"""
Imports an existing MOJO model as an H2O model.
:param mojo_path: Path to the MOJO archive on the H2O's filesystem
:return: An H2OGenericEstimator instance embedding given MOJO
"""
if mojo_path == None:
raise TypeError("MOJO path may not be None")
mojo_estimator = H2OGenericEstimator.from_file(mojo_path)
print(mojo_estimator)
return mojo_estimator
[docs]def upload_mojo(mojo_path):
"""
Uploads an existing MOJO model from local filesystem into H2O and imports it as an H2O Generic Model.
:param mojo_path: Path to the MOJO archive on the user's local filesystem
:return: An H2OGenericEstimator instance embedding given MOJO
"""
response = api("POST /3/PostFile", filename=mojo_path)
frame_key = response["destination_frame"]
mojo_estimator = H2OGenericEstimator(model_key = get_frame(frame_key))
mojo_estimator.train()
print(mojo_estimator)
return mojo_estimator
#-----------------------------------------------------------------------------------------------------------------------
# Private
#-----------------------------------------------------------------------------------------------------------------------
def _check_connection():
if not h2oconn or not h2oconn.cluster:
raise H2OConnectionError("Not connected to a cluster. Did you run `h2o.connect()`?")
def _connect_with_conf(conn_conf):
conf = conn_conf
if isinstance(conn_conf, dict):
conf = H2OConnectionConf(config=conn_conf)
assert_is_type(conf, H2OConnectionConf)
return connect(url = conf.url, verify_ssl_certificates = conf.verify_ssl_certificates,
auth = conf.auth, proxy = conf.proxy,cookies = conf.cookies, verbose = conf.verbose)
#-----------------------------------------------------------------------------------------------------------------------
# ALL DEPRECATED METHODS BELOW
#-----------------------------------------------------------------------------------------------------------------------
# Deprecated since 2015-10-08
@deprecated("Deprecated, use ``h2o.import_file()``.")
def import_frame():
"""Deprecated."""
import_file()
# Deprecated since 2015-10-08
@deprecated("Deprecated (converted to a private method).")
def parse():
"""Deprecated."""
pass
# Deprecated since 2016-08-04
[docs]@deprecated("Deprecated, use ``h2o.cluster().show_status()``.")
def cluster_info():
"""Deprecated."""
_check_connection()
cluster().show_status()
# Deprecated since 2016-08-04
[docs]@deprecated("Deprecated, use ``h2o.cluster().show_status(True)``.")
def cluster_status():
"""Deprecated."""
_check_connection()
cluster().show_status(True)
# Deprecated since 2016-08-04
[docs]@deprecated("Deprecated, use ``h2o.cluster().shutdown()``.")
def shutdown(prompt=False):
"""Deprecated."""
_check_connection()
cluster().shutdown(prompt)
# Deprecated since 2016-08-04
[docs]@deprecated("Deprecated, use ``h2o.cluster().network_test()``.")
def network_test():
"""Deprecated."""
_check_connection()
cluster().network_test()
# Deprecated since 2016-08-04
[docs]@deprecated("Deprecated, use ``h2o.cluster().timezone``.")
def get_timezone():
"""Deprecated."""
_check_connection()
return cluster().timezone
# Deprecated since 2016-08-04
[docs]@deprecated("Deprecated, set ``h2o.cluster().timezone`` instead.")
def set_timezone(value):
"""Deprecated."""
_check_connection()
cluster().timezone = value
# Deprecated since 2016-08-04
[docs]@deprecated("Deprecated, use ``h2o.cluster().list_timezones()``.")
def list_timezones():
"""Deprecated."""
_check_connection()
return cluster().list_timezones()