Source code for h2o.expr

"""
This module contains code for the lazy expression DAG.
"""

import sys
from math import sqrt, isnan, floor
import h2o
import frame
import tabulate

__CMD__ = None
__TMPS__ = None


[docs]class Expr(object): """ Expr objects have a few different flavors: 1. A pending to-be-computed BigData expression. Does _NOT_ have a Key 2. An already computed BigData expression. Does have a key 3. A small-data computation, pending or not. Pending computations point to other Expr objects in a DAG of pending computations. Pointed at by at most one H2OVec (during construction) and no others. If that H2OVec goes dead, this computation is known to be an internal temp, used only in building other Expr objects. """ def __init__(self, op, left=None, rite=None, length=None): """ Create a new Expr object. Constructor choices: ("op" left rite): pending calc, awaits left & rite being computed ("op" None None): precomputed local small data (fr_key #num name): precomputed remote Big Data :param op: An operation to perform :param left: An Expr to the "left" :param rite: An Expr to the "right" :param length: The length of the H2OVec/H2OFrame object. :return: A new Expr object. """ # instance variables self._op = None self._data = None # the "head" of a frame, or a float, or int self._left = None self._rite = None self._name = None self._summary = None # computed lazily self._len = None self._vecname = "" # the name of the Vec, if any self._isslice = False # if a slice, then return a new H2OVec from show self._op, self._data = (op, None) if isinstance(op, str) else ("rawdata", op) self._name = self._op # Set an initial name, generally overwritten assert self._is_valid(), str(self._name) + str(self._data) self._left = left._expr if isinstance(left, frame.H2OVec) else left self._rite = rite._expr if isinstance(rite, frame.H2OVec) else rite assert self._left is None or self._left._is_valid(), self._left.debug() assert self._rite is None or self._rite._is_valid(), self._rite.debug() # Compute length eagerly if self.is_remote(): # Length must be provided for remote data assert length is not None self._len = length elif self.is_local(): # Local data, grab length by inspection self._len = len(self._data) if isinstance(self._data, list) else 1 elif self.is_slice(): self._len = self._data.stop - self._data.start else: self._len = length if length else len(self._left) assert self._len is not None if left and isinstance(left, frame.H2OVec): self._vecname = left._name
[docs] def name(self): return self._name
[docs] def op(self): return self._op
[docs] def set_len(self, i): self._len = i
[docs] def get_len(self): return self._len
[docs] def data(self): return self._data
[docs] def left(self): return self._left
[docs] def rite(self): return self._rite
[docs] def vecname(self): return self._vecname
[docs] def is_local(self): return isinstance(self._data, (list, int, float))
[docs] def is_remote(self): return isinstance(self._data, unicode)
[docs] def is_pending(self): return self._data is None
[docs] def is_computed(self): return not self.is_pending()
[docs] def is_slice(self): return isinstance(self._data, slice)
def _is_valid(self): return self.is_local() or self.is_remote() or self.is_pending() or self.is_slice() def __len__(self): """ The length of this H2OVec/H2OFrame (generally without triggering eager evaluation) :return: The number of columns/rows of the H2OFrame/H2OVec. """ return self._len
[docs] def debug(self): """ :return: The structure of this object without evaluating. """ return ("(" + self._name + " <== " + str(self._left._name if isinstance(self._left, Expr) else self._left) + " " + self._op + " " + str(self._rite._name if isinstance(self._rite, Expr) else self._rite) + " ==> " + str(type(self._data)) + ")")
[docs] def show(self, noprint=False): """ Evaluate and print. :return: """ self.eager() if noprint: if isinstance(self._data, unicode): j = h2o.frame(self._data) data = j['frames'][0]['columns'][0]['data'][0:10] return data return self._data else: if isinstance(self._data, unicode): j = h2o.frame(self._data) data = j['frames'][0]['columns'][0]['data'][0:10] elif isinstance(self._data, int): print self._data return else: data = [self._data] header = self._vecname + " (first " + str(len(data)) + " row(s))" rows = range(1, len(data) + 1, 1) print tabulate.tabulate(zip(rows, data), headers=["Row ID", header]) print # def __repr__(self): # self.show() # return "" # Compute summary data
[docs] def summary(self): self.eager() if self.is_local(): x = self._data[0] t = 'int' if isinstance(x, int) else ( 'enum' if isinstance(x, str) else 'real') mins = [min(self._data)] maxs = [max(self._data)] n = len(self._data) mean = sum(self._data) / n if t != 'enum' else None ssq = 0 zeros = 0 missing = 0 for x in self._data: delta = x - mean if t != 'enum': ssq += delta * delta if x == 0: zeros += 1 if x is None or (t != 'enum' and isnan(x)): missing += 1 stddev = sqrt(ssq / (n - 1)) if t != 'enum' else None return {'type': t, 'mins': mins, 'maxs': maxs, 'mean': mean, 'sigma': stddev, 'zeros': zeros, 'missing': missing} if self._summary: return self._summary j = h2o.frame(self._data) self._summary = j['frames'][0]['columns'][0] return self._summary # Basic indexed or sliced lookup
def __getitem__(self, i): x = self.eager() if self.is_local(): return x[i] if not isinstance(i, int): raise NotImplementedError # need a bigdata slice here # ([ %vec #row #0) #j = H2OCONN.Rapids("([ %"+str(self._data)+" #"+str(i)+" #0)") #return j['scalar'] raise NotImplementedError # Small-data add; result of a (lazy but small) Expr vs a plain int/float def __add__(self, i): return self.eager() + i def __radd__(self, i): return self + i # Add is commutative def __del__(self): # Dead pending op or local data; nothing to delete if self.is_pending() or self.is_local(): return assert self.is_remote(), "Data wasn't remote. Hrm..." global __CMD__ if __CMD__ is None: h2o.remove(self._data) else: s = " (del '" + self._data + "' #0)" global __TMPS__ if __TMPS__ is None: print "Lost deletes: ", s else: __TMPS__ += s
[docs] def eager(self): """ This forces a top-level execution, as needed, and produces a top-level result locally. Frames are returned and truncated to the standard preview response provided by rapids - 100 rows X 200 cols. :return: A key pointing to the big data object """ if self.is_computed(): return self._data # Gather the computation path for remote work, or doit locally for local work global __CMD__, __TMPS__ assert not __CMD__ and not __TMPS__ __CMD__ = "" __TMPS__ = "" # Begin gathering rapids commands self._do_it() # Symbolically execute the command cmd = __CMD__ tmps = __TMPS__ # Stop gathering rapids commands __CMD__ = None __TMPS__ = None if self.is_local(): return self._data # Local computation, all done # Remote computation - ship Rapids over wire, assigning key to result if tmps: cmd = "(, " + cmd + tmps + ")" j = h2o.rapids(cmd) if isinstance(self._data, unicode): pass # Big Data Key is the result # Small data result pulled locally else: self._data = j['head'] if j['num_rows'] else j['scalar'] return self._data
def _do_child(self, child): assert child is None or isinstance(child, Expr), " expected None or Expr but found: %r" % child global __CMD__ if child: if child.is_pending(): child._do_it() elif isinstance(child._data, (int, float)): __CMD__ += "#" + str(child._data) elif isinstance(child._data, unicode): __CMD__ += "'" + str(child._data) + "'" elif isinstance(child._data, slice): __CMD__ += \ "(: #" + str(child._data.start) + " #" + str(child._data.stop - 1) \ + ")" child._data = None # trigger GC now else: pass __CMD__ += " " return child def _do_it(self): """ External API for eager; called by all top-level demanders (e.g. print) This may trigger (recursive) big-data evaluation. :return: None """ if self.is_computed(): return # Slice assignments are a 2-nested deep structure, returning the left-left # vector. Must fetch it now, before it goes dead after eval'ing the slice. # Shape: (= ([ %vec bool_slice_expr) vals_to_assign) # Need to fetch %vec out assign_vec = self._left._left if self._op == "=" and self._left._op == "[" else None # See if this is not a temp and not a scalar; if so it needs a name # Remove one count for the call to getrefcount itself cnt = sys.getrefcount(self) - 1 # Magical count-of-4 is the depth of 4 interpreter stack py_tmp = cnt != 4 and self._len > 1 and not assign_vec global __CMD__ if py_tmp: self._data = frame.H2OFrame.py_tmp_key() # Top-level key/name assignment __CMD__ += "(= !" + self._data + " " if self._op != ",": # Comma ops curry in-place (just gather args) __CMD__ += "(" + self._op + " " left = self._do_child(self._left) # down the left rite = self._do_child(self._rite) # down the right # eventually will need to create dedicated objects each overriding a "set_data" method # live with this "switch" for now # Do not try/catch NotImplementedError - it blows the original stack trace # so then you can't see what's not implemented if self._op in ["+", "&", "-", "*", "/", "==", "<", ">", ">=", "<="]: # in self.BINARY_INFIX_OPS: # num op num # num op [] if isinstance(left._data, (int, float)): if isinstance(rite._data, (int, float)): self._data = eval("left " + self._op + " rite") elif rite.is_local(): self._data = eval("[left "+ self._op + " x for x in rite._data]") else: pass # [] op num elif isinstance(rite._data, (int, float)): if left.is_local(): self._data = eval("[x" + self._op + " rite for x in left._data]") else: pass # [] op [] else: if left.is_local() and rite.is_local(): self._data = eval("[x " + self._op + " y for x, y in zip(left._data, rite._data)]") elif (left.is_remote() or left._data is None) and \ (rite.is_remote() or rite._data is None): pass else: raise NotImplementedError elif self._op == "[": # [] = [] if left.is_local(): self._data = left._data[rite._data] # all rows / columns ([ %fr_key "null" ()) / ([ %fr_key () "null") else: __CMD__ += ' "null"' elif self._op == "=": if left.is_local(): raise NotImplementedError else: if rite is None: __CMD__ += "#NaN" elif self._op == "floor": if left.is_local(): self._data = [floor(x) for x in left._data] else: pass elif self._op == "month": if left.is_local(): raise NotImplementedError else: pass elif self._op == "dayOfWeek": if left.is_local(): raise NotImplementedError else: pass elif self._op == "mean": if left.is_local(): self._data = sum(left._data) / len(left._data) else: __CMD__ += " #0 %TRUE" # Rapids mean extra args (trim=0, rmNA=TRUE) elif self._op in ["as.factor", "h2o.runif", "is.na"]: if left.is_local(): self._data = map(str, left._data) else: pass elif self._op == "quantile": if left.is_local(): raise NotImplementedError else: rapids_series = "{"+";".join([str(x) for x in rite._data])+"}" __CMD__ += rapids_series + " %FALSE #7" elif self._op == "mktime": if left.is_local(): raise NotImplementedError else: pass elif self._op == ",": pass else: raise NotImplementedError(self._op) # End of expression... wrap up parens if self._op != ",": __CMD__ += ")" if py_tmp: __CMD__ += ")" # Free children expressions; might flag some subexpresions as dead self._left = None # Trigger GC/ref-cnt of temps self._rite = None # Keep LHS alive if assign_vec: #if assign_vec._op != "rawdata": # Need to roll-up nested exprs # print assign_vec.debug() # print assign_vec._data,assign_vec._left,assign_vec._rite # raise NotImplementedError self._left = assign_vec self._data = assign_vec._data