Source code for h2o.backend.server

# -*- encoding: utf-8 -*-
"""
Local server.

`H2OLocalServer` allows to start H2O servers on your local machine:
    hs = H2OLocalServer.start() : start a new local server
    hs.is_running() : check if the server is running
    hs.shutdown() : shut down the server

:copyright: (c) 2016 H2O.ai
:license:   Apache License Version 2.0 (see LICENSE for details)
"""
from __future__ import absolute_import, division, print_function, unicode_literals
from h2o.utils.compatibility import *  # NOQA

import atexit
import os
import subprocess
import sys
import tempfile
import time
import re
from random import choice
from sysconfig import get_config_var
from warnings import warn

from h2o.exceptions import H2OServerError, H2OStartupError, H2OValueError
from h2o.utils.typechecks import assert_is_type, assert_satisfies, BoundInt, I, is_type

__all__ = ("H2OLocalServer", )



[docs]class H2OLocalServer(object): """ Handle to an H2O server launched locally. Public interface:: hs = H2OLocalServer.start(...) # launch a new local H2O server hs.is_running() # check if the server is running hs.shutdown() # shut down the server hs.scheme # either "http" or "https" hs.ip # ip address of the server, typically "127.0.0.1" hs.port # port on which the server is listening Once started, the server will run until the script terminates, or until you call `.shutdown()` on it. Moreover, if the server terminates with an exception, then the server will not stop and will continue to run even after Python process exits. This runaway process may end up being in a bad shape (e.g. frozen), then the only way to terminate it is to kill the java process from the terminal. Alternatively, it is possible to start the server as a context manager, in which case it will be automatically shut down even if an exception occurs in Python (but not if the Python process is killed):: with H2OLocalServer.start() as hs: # do something with the server -- probably connect to it """ ## (Avkash) Changing the maximum time to wait as 60 seconds to match with the same time in to R API. ## _TIME_TO_START = 60 # Maximum time we wait for the server to start up (in seconds) _TIME_TO_KILL = 3 # Maximum time we wait for the server to shut down until we kill it (in seconds) _BAD_JAVA_VERSION_RETURN_CODE_ = 3
[docs] @staticmethod def start(jar_path=None, nthreads=-1, enable_assertions=True, max_mem_size=None, min_mem_size=None, ice_root=None, log_dir=None, log_level=None, max_log_file_size=None, port="54321+", name=None, extra_classpath=None, verbose=True, jvm_custom_args=None, bind_to_localhost=True): """ Start new H2O server on the local machine. :param jar_path: Path to the h2o.jar executable. If not given, then we will search for h2o.jar in the locations returned by `._jar_paths()`. :param nthreads: Number of threads in the thread pool. This should be related to the number of CPUs used. -1 means use all CPUs on the host. A positive integer specifies the number of CPUs directly. :param enable_assertions: If True, pass `-ea` option to the JVM. :param max_mem_size: Maximum heap size (jvm option Xmx), in bytes. :param min_mem_size: Minimum heap size (jvm option Xms), in bytes. :param log_dir: Directory for H2O logs to be stored if a new instance is started. Default directory is determined by H2O internally. :param log_level: The logger level for H2O if a new instance is started. :param max_log_file_size: Maximum size of INFO and DEBUG log files. The file is rolled over after a specified size has been reached. (The default is 3MB. Minimum is 1MB and maximum is 99999MB) :param ice_root: A directory where H2O stores its temporary files. Default location is determined by tempfile.mkdtemp(). :param port: Port where to start the new server. This could be either an integer, or a string of the form "DDDDD+", indicating that the server should start looking for an open port starting from DDDDD and up. :param name: name of the h2o cluster to be started :param extra_classpath: List of paths to libraries that should be included on the Java classpath. :param verbose: If True, then connection info will be printed to the stdout. :param jvm_custom_args: Custom, user-defined arguments for the JVM H2O is instantiated in :param bind_to_localhost: A flag indicating whether access to the H2O instance should be restricted to the local machine (default) or if it can be reached from other computers on the network. Only applicable when H2O is started from the Python client. :returns: a new H2OLocalServer instance """ assert_is_type(jar_path, None, str) assert_is_type(port, None, int, str) assert_is_type(name, None, str) assert_is_type(nthreads, -1, BoundInt(1, 4096)) assert_is_type(enable_assertions, bool) assert_is_type(min_mem_size, None, int) assert_is_type(max_mem_size, None, BoundInt(1 << 25)) assert_is_type(log_dir, str, None) assert_is_type(log_level, str, None) assert_satisfies(log_level, log_level in [None, "TRACE", "DEBUG", "INFO", "WARN", "ERRR", "FATA"]) assert_is_type(max_log_file_size, str, None) assert_is_type(ice_root, None, I(str, os.path.isdir)) assert_is_type(extra_classpath, None, [str]) assert_is_type(jvm_custom_args, list, None) assert_is_type(bind_to_localhost, bool) if jar_path: assert_satisfies(jar_path, jar_path.endswith("h2o.jar")) if min_mem_size is not None and max_mem_size is not None and min_mem_size > max_mem_size: raise H2OValueError("`min_mem_size`=%d is larger than the `max_mem_size`=%d" % (min_mem_size, max_mem_size)) if port is None: port = "54321+" baseport = None # TODO: get rid of this port gimmick and have 2 separate parameters. if is_type(port, str): if port.isdigit(): port = int(port) else: if not(port[-1] == "+" and port[:-1].isdigit()): raise H2OValueError("`port` should be of the form 'DDDD+', where D is a digit. Got: %s" % port) baseport = int(port[:-1]) port = 0 hs = H2OLocalServer() hs._verbose = bool(verbose) hs._jar_path = hs._find_jar(jar_path) hs._extra_classpath = extra_classpath hs._ice_root = ice_root hs._name = name if not ice_root: hs._ice_root = tempfile.mkdtemp() hs._tempdir = hs._ice_root if verbose: print("Attempting to start a local H2O server...") hs._launch_server(port=port, baseport=baseport, nthreads=int(nthreads), ea=enable_assertions, mmax=max_mem_size, mmin=min_mem_size, jvm_custom_args=jvm_custom_args, bind_to_localhost=bind_to_localhost, log_dir=log_dir, log_level=log_level, max_log_file_size=max_log_file_size) if verbose: print(" Server is running at %s://%s:%d" % (hs.scheme, hs.ip, hs.port)) atexit.register(lambda: hs.shutdown()) return hs
[docs] def is_running(self): """Return True if the server process is still running, False otherwise.""" return self._process is not None and self._process.poll() is None
[docs] def shutdown(self): """ Shut down the server by trying to terminate/kill its process. First we attempt to terminate the server process gracefully (sending SIGTERM signal). However after _TIME_TO_KILL seconds if the process didn't shutdown, we forcefully kill it with a SIGKILL signal. """ if not self._process: return try: kill_time = time.time() + self._TIME_TO_KILL while self._process.poll() is None and time.time() < kill_time: self._process.terminate() time.sleep(0.2) if self._process().poll() is None: self._process.kill() time.sleep(0.2) if self._verbose: print("Local H2O server %s:%s stopped." % (self.ip, self.port)) except: pass self._process = None
@property def scheme(self): """Connection scheme, 'http' or 'https'.""" return self._scheme @property def ip(self): """IP address of the server.""" return self._ip @property def port(self): """Port that the server is listening to.""" return self._port @property def name(self): """H2O cluster name.""" return self._name #------------------------------------------------------------------------------------------------------------------- # Private #------------------------------------------------------------------------------------------------------------------- def __init__(self): """[Internal] please use H2OLocalServer.start() to launch a new server.""" self._scheme = None # "http" or "https" self._ip = None self._port = None self._name = None self._process = None self._verbose = None self._jar_path = None self._extra_classpath = None self._ice_root = None self._stdout = None self._stderr = None self._tempdir = None def _find_jar(self, path0=None): """ Return the location of an h2o.jar executable. :param path0: Explicitly given h2o.jar path. If provided, then we will simply check whether the file is there, otherwise we will search for an executable in locations returned by ._jar_paths(). :raises H2OStartupError: if no h2o.jar executable can be found. """ jar_paths = [path0] if path0 else self._jar_paths() searched_paths = [] for jp in jar_paths: searched_paths.append(jp) if os.path.exists(jp): return jp raise H2OStartupError("Cannot start local server: h2o.jar not found. Paths searched:\n" + "".join(" %s\n" % s for s in searched_paths)) @staticmethod def _jar_paths(): """Produce potential paths for an h2o.jar executable.""" # PUBDEV-3534 hook to use arbitrary h2o.jar own_jar = os.getenv("H2O_JAR_PATH", "") if own_jar != "": if not os.path.isfile(own_jar): raise H2OStartupError("Environment variable H2O_JAR_PATH is set to '%d' but file does not exists, unset environment variable or provide valid path to h2o.jar file." % own_jar) yield own_jar # Check if running from an h2o-3 src folder (or any subfolder), in which case use the freshly-built h2o.jar cwd_chunks = os.path.abspath(".").split(os.path.sep) for i in range(len(cwd_chunks), 0, -1): if cwd_chunks[i - 1] == "h2o-3": yield os.path.sep.join(cwd_chunks[:i] + ["build", "h2o.jar"]) # Then check the backend/bin folder: # (the following works assuming this code is located in h2o/backend/server.py file) backend_dir = os.path.split(os.path.realpath(__file__))[0] yield os.path.join(backend_dir, "bin", "h2o.jar") # Then try several old locations where h2o.jar might have been installed prefix1 = prefix2 = sys.prefix # On Unix-like systems Python typically gets installed into /Library/... or /System/Library/... If one of # those paths is sys.prefix, then we also build its counterpart. if prefix1.startswith(os.path.sep + "Library"): prefix2 = os.path.join("", "System", prefix1) elif prefix1.startswith(os.path.sep + "System"): prefix2 = prefix1[len(os.path.join("", "System")):] yield os.path.join(prefix1, "h2o_jar", "h2o.jar") yield os.path.join(os.path.abspath(os.sep), "usr", "local", "h2o_jar", "h2o.jar") yield os.path.join(prefix1, "local", "h2o_jar", "h2o.jar") yield os.path.join(get_config_var("userbase"), "h2o_jar", "h2o.jar") yield os.path.join(prefix2, "h2o_jar", "h2o.jar") def _launch_server(self, port, baseport, mmax, mmin, ea, nthreads, jvm_custom_args, bind_to_localhost, log_dir=None, log_level=None, max_log_file_size=None): """Actually start the h2o.jar executable (helper method for `.start()`).""" self._ip = "127.0.0.1" # Find Java and check version. (Note that subprocess.check_output returns the output as a bytes object) java = self._find_java() self._check_java(java, self._verbose) if self._verbose: print(" Starting server from " + self._jar_path) print(" Ice root: " + self._ice_root) # Combine jar path with the optional extra classpath classpath = [self._jar_path] if self._extra_classpath is None else [self._jar_path] + self._extra_classpath # Construct java command to launch the process cmd = [java] # ...add JVM options cmd += ["-ea"] if ea else [] for (mq, num) in [("-Xms", mmin), ("-Xmx", mmax)]: if num is None: continue numstr = "%dG" % (num >> 30) if num == (num >> 30) << 30 else \ "%dM" % (num >> 20) if num == (num >> 20) << 20 else \ str(num) cmd += [mq + numstr] if jvm_custom_args is not None: for arg in jvm_custom_args: assert type(arg) is str cmd += [arg] cmd += ["-cp", os.pathsep.join(classpath), "water.H2OApp"] # This should be the last JVM option # ...add H2O options cmd += ["-ip", self._ip] if bind_to_localhost: cmd += ["-web_ip", self._ip] cmd += ["-port", str(port)] if port else [] cmd += ["-baseport", str(baseport)] if baseport else [] cmd += ["-ice_root", self._ice_root] cmd += ["-nthreads", str(nthreads)] if nthreads > 0 else [] if log_dir: cmd += ["-log_dir", log_dir] if log_level: cmd += ["-log_level", log_level] if max_log_file_size: cmd += ["-max_log_file_size", max_log_file_size] if not self._name: self._name = "H2O_from_python_%s" % self._tmp_file("salt") cmd += ["-name", self._name] # Warning: do not change to any higher log-level, otherwise we won't be able to know which port the # server is listening to. cmd += ["-log_level", "INFO"] # Create stdout and stderr files self._stdout = self._tmp_file("stdout") self._stderr = self._tmp_file("stderr") cwd = os.path.abspath(os.getcwd()) out = open(self._stdout, "w") err = open(self._stderr, "w") if self._verbose: print(" JVM stdout: " + out.name) print(" JVM stderr: " + err.name) # Launch the process win32 = sys.platform == "win32" flags = getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0) if win32 else 0 prex = os.setsid if not win32 else None try: proc = subprocess.Popen(args=cmd, stdout=out, stderr=err, cwd=cwd, creationflags=flags, preexec_fn=prex) except OSError as e: traceback = getattr(e, "child_traceback", None) raise H2OServerError("Unable to start server: %s" % e, traceback) # Wait until the server is up-and-running giveup_time = time.time() + self._TIME_TO_START while True: if proc.poll() is not None: if proc.returncode == self._BAD_JAVA_VERSION_RETURN_CODE_: error_message = "Server process terminated because of unsupported Java version" else: error_message = "Server process terminated with error code %d" % proc.returncode if os.stat(self._stderr).st_size > 0: error_message += ": %s" % open(self._stderr).read() else: error_message += "." raise H2OServerError(error_message) ret = self._get_server_info_from_logs() if ret: self._scheme = ret[0] self._ip = ret[1] self._port = ret[2] self._process = proc break if time.time() > giveup_time: elapsed_time = time.time() - (giveup_time - self._TIME_TO_START) raise H2OServerError("Server wasn't able to start in %f seconds." % elapsed_time) time.sleep(0.2) @staticmethod def _check_java(java, verbose): jver_bytes = subprocess.check_output([java, "-version"], stderr=subprocess.STDOUT) jver = jver_bytes.decode(encoding="utf-8", errors="ignore") if verbose: print(" Java Version: " + jver.strip().replace("\n", "; ")) if "GNU libgcj" in jver: raise H2OStartupError("Sorry, GNU Java is not supported for H2O.\n" "Please download the latest 64-bit Java SE JDK from Oracle.") if "Client VM" in jver: warn(" You have a 32-bit version of Java. H2O works best with 64-bit Java.\n" " Please download the latest 64-bit Java SE JDK from Oracle.\n") H2OLocalServer._has_compatible_version(jver) @staticmethod def _has_compatible_version(java_version): pattern = re.compile("1\\.[1-7]\\.") if pattern.search(java_version): raise H2OStartupError("Your java is not supported: " + java_version.strip().replace("\n", "; ")) @staticmethod def _find_java(): """ Find location of the java executable (helper for `._launch_server()`). This method is not particularly robust, and may require additional tweaking for different platforms... :return: Path to the java executable. :raises H2OStartupError: if java cannot be found. """ java = "java.exe" if sys.platform == "win32" else "java" h2o_java = os.getenv("H2O_JAVA_HOME") if h2o_java: full_path = os.path.join(h2o_java, "bin", java) if not os.path.exists(full_path): raise H2OStartupError("Environment variable H2O_JAVA_HOME is set to '%d' but this location doesn't appear to be a valid Java Home directory, unset environment variable or provide valid path to Java Home." % h2o_java) return full_path # is java callable directly (doesn't work on windows it seems)? if os.access(java, os.X_OK): return java # Can Java be found on the PATH? for path in os.getenv("PATH").split(os.pathsep): # not same as os.path.sep! full_path = os.path.join(path, java) if os.access(full_path, os.X_OK): return full_path # check if JAVA_HOME is set (for Windows) if os.getenv("JAVA_HOME"): full_path = os.path.join(os.getenv("JAVA_HOME"), "bin", java) if os.path.exists(full_path): return full_path # check "/Program Files" and "/Program Files (x86)" on Windows if sys.platform == "win32": # On Windows, backslash on the drive letter is necessary, otherwise os.path.join produces an invalid path program_folders = [os.path.join("C:\\", "Program Files", "Java"), os.path.join("C:\\", "Program Files (x86)", "Java"), os.path.join("C:\\", "ProgramData", "Oracle", "Java")] for folder in program_folders: for dirpath, dirnames, filenames in os.walk(folder): if java in filenames: return os.path.join(dirpath, java) # not found... raise H2OStartupError("Cannot find Java. Please install the latest JRE from\n" "http://www.oracle.com/technetwork/java/javase/downloads/index.html") def _tmp_file(self, kind): """ Generate names for temporary files (helper method for `._launch_server()`). :param kind: one of "stdout", "stderr" or "salt". The "salt" kind is used for process name, not for a file, so it doesn't contain a path. All generated names are based on the user name of the currently logged-in user. """ if sys.platform == "win32": username = os.getenv("USERNAME") else: username = os.getenv("USER") if not username: username = "unknownUser" usr = "".join(ch if ch.isalnum() else "_" for ch in username) if kind == "salt": return usr + "_" + "".join(choice("0123456789abcdefghijklmnopqrstuvwxyz") for _ in range(6)) else: if not self._tempdir: self._tempdir = tempfile.mkdtemp() return os.path.join(self._tempdir, "h2o_%s_started_from_python.%s" % (usr, kind[3:])) def _get_server_info_from_logs(self): """ Check server's output log, and determine its scheme / IP / port (helper method for `._launch_server()`). This method is polled during process startup. It looks at the server output log and checks for a presence of a particular string ("INFO: Open H2O Flow in your web browser:") which indicates that the server is up-and-running. If the method detects this string, it extracts the server's scheme, ip and port and returns them; otherwise it returns None. :returns: (scheme, ip, port) tuple if the server has already started, None otherwise. """ searchstr = ": Open H2O Flow in your web browser:" with open(self._stdout, "rt") as f: for line in f: if searchstr in line: url = line[line.index(searchstr) + len(searchstr):].strip().rstrip("/") parts = url.split(":") assert len(parts) == 3 and (parts[0] == "http" or parts[1] == "https") and parts[2].isdigit(), \ "Unexpected URL: %s" % url return parts[0], parts[1][2:], int(parts[2]) return None def __enter__(self): return self def __exit__(self, *args): self.shutdown() assert len(args) == 3 # Avoid warning about unused args... return False # ensure that any exception will be re-raised
# Do not stop child process when the object is garbage collected! # This ensures that simple code such as # for _ in range(5): # h2o.H2OConnection.start() # will launch 5 servers, and they will not be closed down immediately (only when the program exits).