Source code for h2o.expr

import h2o, frame, astfun
import math, collections, tabulate, urllib, gc, sys, copy


[docs]class ExprNode: """Composable Expressions: This module contains code for the lazy expression DAG. Execution Overview ------------------ The job of ExprNode is to provide a layer of indirection to H2OFrame instances that are built of arbitrarily large, lazy expression DAGs. In order to do this job well, ExprNode must also track top-level entry points to the such DAGs, maintain a sane amount of state to know which H2OFrame instances are temporary (or not), and maintain a cache of H2OFrame properties (nrows, ncols, types, names, few rows of data). Top-Level Entry Points ---------------------- An expression is declared top-level if it A) Computes and returns an H2OFrame to some on-demand call from somewhere B) An H2OFrame instance has more referrers than the 4 needed for the usual flow of python execution (see MAGIC_REF_COUNT below for more details). Sane Amount of State -------------------- Instances of H2OFrame live and die by the state contained in the _ex field. The three pieces of state -- _op, _children, _cache -- are the fewest pieces of state (and no fewer) needed to unambiguously track temporary H2OFrame instances and prune them according to the usual scoping laws of python. If _cache._id is None, then this DAG has never been sent over to H2O, and there's nothing more to do when the object goes out of scope. If _cache._id is not None, then there has been some work done by H2O to compute the big data object sitting in H2O to which _id points. At the time that __del__ is called on this object, a determination to throw out the corresponding data in H2O or to keep that data is made by the None'ness of _children. tl;dr: If _cache._id is not None and _children is None, then do not delete in H2O cluster If _cache._id is not None and _children is not None, then do delete in H2O cluster H2OCache -------- To prevent several unnecessary REST calls and unnecessary execution, a few of the oft-needed attributes of the H2OFrame are cached for convenience. The primary consumers of these cached values are __getitem__, __setitem__, and a few other H2OFrame ops that do argument validation or exchange (e.g. colnames for indices). There are more details available under the H2OCache class declaration. """ # Magical count-of-5: (get 2 more when looking at it in debug mode) # 2 for _do_it frame, 2 for _do_it local dictionary list, 1 for parent MAGIC_REF_COUNT = 5 if sys.gettrace() is None else 7 # M = debug ? 7 : 5 def __init__(self, op="", *args): assert isinstance(op,str), op self._op = op # Base opcode string self._children = tuple(a._ex if isinstance(a, frame.H2OFrame) else a for a in args) # ast children; if not None and _cache._id is not None then tmp self._cache = H2OCache() # ncols, nrows, names, types def _eager_frame(self): # returns H2OFrame instance if not self._cache.is_empty(): return self if self._cache._id is not None: return self # Data already computed under ID, but not cached locally return self._eval_driver(True) def _eager_scalar(self): # returns a scalar (or a list of scalars) if not self._cache.is_empty(): assert self._cache.is_scalar() return self assert self._cache._id is None self._eval_driver(False) assert self._cache._id is None assert self._cache.is_scalar() return self._cache._data def _eval_driver(self,top): exec_str = self._do_it(top) res = h2o.rapids(exec_str) if 'scalar' in res: self._cache._data = res['scalar'] if 'string' in res: self._cache._data = res['string'] if 'funstr' in res: raise NotImplementedError if 'key' in res: self._cache.nrows = res['num_rows'] self._cache.ncols = res['num_cols'] return self # Recursively build a rapids execution string. Any object with more than # MAGIC_REF_COUNT referrers will be cached as a temp until the next client GC # cycle - consuming memory. Do Not Call This except when you need to do some # other cluster operation on the evaluated object. Examples might be: lazy # dataset time parse vs changing the global timezone. Global timezone change # is eager, so the time parse as to occur in the correct order relative to # the timezone change, so cannot be lazy. def _do_it(self,top): if not self._cache.is_empty(): # Data already computed and cached; could a "false-like" cached value return str(self._cache._data) if self._cache.is_scalar() else self._cache._id if self._cache._id is not None: return self._cache._id # Data already computed under ID, but not cached # assert isinstance(self._children,tuple) exec_str = "({} {})".format(self._op," ".join([ExprNode._arg_to_expr(ast) for ast in self._children])) gc_ref_cnt = len(gc.get_referrers(self)) if top or gc_ref_cnt >= ExprNode.MAGIC_REF_COUNT: self._cache._id = frame._py_tmp_key() exec_str = "(tmp= {} {})".format(self._cache._id, exec_str) return exec_str @staticmethod def _arg_to_expr(arg): if isinstance(arg, ExprNode): return arg._do_it(False) elif isinstance(arg, astfun.ASTId): return str(arg) elif isinstance(arg, bool): return "{}".format("TRUE" if arg else "FALSE") elif isinstance(arg, (int, long, float)): return "{}".format("NaN" if math.isnan(arg) else arg) elif isinstance(arg, basestring): return '"'+arg+'"' elif isinstance(arg, slice): return "[{}:{}]".format(0 if arg.start is None else arg.start,"NaN" if (arg.stop is None or math.isnan(arg.stop)) else (arg.stop) if arg.start is None else (arg.stop-arg.start)) elif isinstance(arg, list): return ("[\"" + "\" \"".join(arg) + "\"]") if all(isinstance(i, basestring) for i in arg) else ("[" + " ".join(["NaN" if i == 'NaN' or math.isnan(i) else str(i) for i in arg])+"]") elif arg is None: return "[]" # empty list raise ValueError("Unexpected arg type: " + str(type(arg))+" "+str(arg.__class__)+" "+arg.__repr__()) def __del__(self): if self._cache._id is not None and self._children is not None: h2o.rapids("(rm {})".format(self._cache._id)) @staticmethod def _collapse_sb(sb): return ' '.join("".join(sb).replace("\n", "").split()).replace(" )", ")") def _debug_print(self,pprint=True): return "".join(self._2_string(sb=[])) if pprint else ExprNode._collapse_sb(self._2_string(sb=[])) def _to_string(self): return ' '.join(["("+self._op] + [ExprNode._arg_to_expr(a) for a in self._children] + [")"]) def _2_string(self,depth=0,sb=None): sb += ['\n', " "*depth, "("+self._op, " "] if self._children is not None: for child in self._children: if isinstance(child, h2o.H2OFrame) and child._ex._cache._id is None: child._ex._2_string(depth+2,sb) elif isinstance(child, h2o.H2OFrame): sb+=['\n', ' '*(depth+2), child._ex._cache._id] elif isinstance(child, ExprNode): child._2_string(depth+2,sb) else: sb+=['\n', ' '*(depth+2), str(child)] sb+=['\n',' '*depth+") "] + ['\n'] * (depth==0) # add a \n if depth == 0 return sb def __repr__(self): return "Expr(op=%r,id=%r,ast=%r,is_scalar=%r)" % (self._op,self._cache._id,self._children,self._cache.is_scalar())
[docs]class H2OCache(object): def __init__(self): self._id = None self._nrows = -1 self._ncols = -1 self._types = None # col types self._names = None # col names self._data = None # ordered dict of cached rows, or a scalar self._l = 0 # nrows cached @property def nrows(self): return self._nrows @nrows.setter
[docs] def nrows(self, value): self._nrows = value
[docs] def nrows_valid(self): return self._nrows >= 0
@property def ncols(self): return self._ncols @ncols.setter
[docs] def ncols(self, value): self._ncols = value
[docs] def ncols_valid(self): return self._ncols >= 0
@property def names(self): return self._names @names.setter
[docs] def names(self, value): self._names = value
[docs] def names_valid(self): return self._names is not None
@property def types(self): return self._types @types.setter
[docs] def types(self, value): self._types = value
[docs] def types_valid(self): return self._types is not None
@property def scalar(self): return self._data if self.is_scalar() else None @scalar.setter
[docs] def scalar(self, value): self._data = value
def __len__(self): return self._l
[docs] def is_empty(self): return self._data is None
[docs] def is_scalar(self): return not isinstance(self._data, dict)
[docs] def is_valid(self): return self._id is not None and \ not self.is_empty() and \ self.nrows_valid() and \ self.ncols_valid() and \ self.names_valid() and \ self.types_valid()
[docs] def fill(self, rows=10): assert self._id is not None if self._data is not None: if rows <= len(self): return res = h2o.H2OConnection.get_json("Frames/"+urllib.quote(self._id), row_count=rows)["frames"][0] self._l = rows self._nrows = res["rows"] self._ncols = res["total_column_count"] self._names = [c["label"] for c in res["columns"]] self._types = dict(zip(self._names,[c["type"] for c in res["columns"]])) self._fill_data(res)
def _fill_data(self, json): self._data = collections.OrderedDict() for c in json["columns"]: c.pop('__meta') # Redundant description ColV3 c.pop('domain_cardinality') # Same as len(c['domain']) sdata = c.pop('string_data') if sdata: c['data'] = sdata # Only use data field; may contain either [str] or [real] # Data (not string) columns should not have a string in them. However, # our NaNs are encoded as string literals "NaN" as opposed to the bare # token NaN, so the default python json decoder does not convert them # to math.nan. Do that now. else: c['data'] = [float('nan') if x=="NaN" else x for x in c['data']] self._data[c.pop('label')] = c # Label used as the Key return self #### pretty printing #### def _tabulate(self,tablefmt,rollups): """Pretty tabulated string of all the cached data, and column names""" if not self.is_valid(): self.fill() # Pretty print cached data d = collections.OrderedDict() # If also printing the rollup stats, build a full row-header if rollups: col = self._data.itervalues().next() # Get a sample column lrows = len(col['data']) # Cached rows being displayed d[""] = ["type", "mins", "mean", "maxs", "sigma", "zeros", "missing"]+map(str,range(lrows)) # For all columns... for k,v in self._data.iteritems(): x = v['data'] # Data to display domain = v['domain'] # Map to cat strings as needed if domain: x = ["" if math.isnan(idx) else domain[int(idx)] for idx in x] if rollups: # Rollups, if requested mins = v['mins'][0] if v['mins'] else None maxs = v['maxs'][0] if v['maxs'] else None x = [v['type'],mins,v['mean'],maxs,v['sigma'],v['zero_count'],v['missing_count']]+x d[k] = x # Insert into ordered-dict return tabulate.tabulate(d,headers="keys",tablefmt=tablefmt)
[docs] def flush(self): # flush everything but the frame_id fr_id = self._id self.__dict__ = H2OCache().__dict__.copy() self._id = fr_id return self
[docs] def fill_from(self, cache): assert isinstance(cache, H2OCache) cur_id = self._id self.__dict__ = copy.deepcopy(cache.__dict__) self._data=None self._id = cur_id
[docs] def dummy_fill(self): self._id = "dummy" self._nrows=0 self._ncols=0 self._names=[] self._types={} self._data={}