Source code for h2o.frame

# -*- coding: utf-8 -*-
# import numpy    no numpy cuz windoz
import collections, csv, itertools, os, re, tempfile, uuid, copy
import h2o
from connection import H2OConnection
from expr import Expr


[docs]class H2OFrame: def __init__(self, python_obj=None, local_fname=None, remote_fname=None, vecs=None, text_key=None): """ Create a new H2OFrame object by passing a file path or a list of H2OVecs. If `remote_fname` is not None, then a REST call will be made to import the data specified at the location `remote_fname`. This path is relative to the H2O cluster, NOT the local Python process If `local_fname` is not None, then the data is not imported into the H2O cluster at the time of object creation. If `python_obj` is not None, then an attempt to upload the python object to H2O will be made. A valid python object has type `list`, or `dict`. For more information on the structure of the input for the various native python data types ("native" meaning non-H2O), please see the general documentation for this object. :param python_obj: A "native" python object - list, dict, tuple. :param local_fname: A local path to a data source. Data is python-process-local. :param remote_fname: A remote path to a data source. Data is cluster-local. :param vecs: A list of H2OVec objects. :param text_key: A raw key resulting from an upload_file. :return: An instance of an H2OFrame object. """ self.local_fname = local_fname self.remote_fname = remote_fname self._vecs = None if python_obj is not None: # avoids the truth value of an array is ambiguous err self._upload_python_object(python_obj) return # Import the data into H2O cluster if remote_fname: rawkey = h2o.import_file(remote_fname) setup = h2o.parse_setup(rawkey) parse = h2o.parse(setup, H2OFrame.py_tmp_key()) # create a new key veckeys = parse['vec_ids'] rows = parse['rows'] cols = parse['column_names'] if parse["column_names"] else ["C" + str(x) for x in range(1,len(veckeys)+1)] self._vecs = H2OVec.new_vecs(zip(cols, veckeys), rows) thousands_sep = h2o.H2ODisplay.THOUSANDS if isinstance(remote_fname, str): print "Imported ", remote_fname, ". Parsed {} rows and {} cols".format(thousands_sep.format(rows), thousands_sep.format(len(cols))) else: h2o.H2ODisplay([["File"+str(i+1),f] for i,f in enumerate(remote_fname)],None, "Parsed {} rows and {} cols".format(thousands_sep.format(rows), thousands_sep.format(len(cols)))) # Read data locally into python process elif local_fname: with open(local_fname, 'rb') as csvfile: self._vecs = [] for name in csvfile.readline().split(','): self._vecs.append(H2OVec(name.rstrip(), Expr([]))) for row in csv.reader(csvfile): for i, data in enumerate(row): self._vecs[i].append(data) print "Imported", local_fname, "into local python process" # Construct from an array of Vecs already passed in elif vecs: vlen = len(vecs[0]) for v in vecs: if not isinstance(v, H2OVec): raise ValueError("Not a list of Vecs") if len(v) != vlen: raise ValueError("Vecs not the same size: " + str(vlen) + " != " + str(len(v))) self._vecs = vecs elif text_key: self._handle_text_key(text_key, None) else: raise ValueError("Frame made from CSV file or an array of Vecs only") def _upload_python_object(self, python_obj): """ Properly handle native python data types. For a discussion of the rules and permissible data types please refer to the main documentation for H2OFrame. :param python_obj: A tuple, list, dict, collections.OrderedDict :return: None """ # [] and () cases -- folded together since H2OFrame is mutable if isinstance(python_obj, (list, tuple)): header, data_to_write = H2OFrame._handle_python_lists(python_obj) # {} and collections.OrderedDict cases elif isinstance(python_obj, (dict, collections.OrderedDict)): header, data_to_write = H2OFrame._handle_python_dicts(python_obj) # handle a numpy.ndarray # elif isinstance(python_obj, numpy.ndarray): # # header, data_to_write = H2OFrame._handle_numpy_array(python_obj) else: raise ValueError("`python_obj` must be a tuple, list, dict, collections.OrderedDict. Got: " + str(type(python_obj))) if header is None or data_to_write is None: raise ValueError("No data to write") # ## write python data to file and upload # # create a temporary file that will be written to tmp_handle,tmp_path = tempfile.mkstemp(suffix=".csv") tmp_file = os.fdopen(tmp_handle,'wb') # create a new csv writer object thingy csv_writer = csv.DictWriter(tmp_file, fieldnames=header, restval=None, dialect="excel", extrasaction="ignore", delimiter=",") csv_writer.writeheader() # write the header csv_writer.writerows(data_to_write) # write the data tmp_file.close() # close the streams self._upload_raw_data(tmp_path, header) # actually upload the data to H2O os.remove(tmp_path) # delete the tmp file def _handle_text_key(self, text_key, column_names): """ Handle result of upload_file :param test_key: A key pointing to raw text to be parsed :return: Part of the H2OFrame constructor. """ # perform the parse setup setup = h2o.parse_setup(text_key) # blocking parse, first line is always a header (since "we" wrote the data out) parse = h2o.parse(setup, H2OFrame.py_tmp_key(), first_line_is_header=1) # a hack to get the column names correct since "parse" does not provide them cols = parse['column_names'] if parse["column_names"] else ["C" + str(x) for x in range(1,len(parse['vec_ids'])+1)] # set the rows rows = parse['rows'] # set the vector keys veckeys = parse['vec_ids'] # create a new vec[] array self._vecs = H2OVec.new_vecs(zip(cols, veckeys), rows) # print some information on the *uploaded* data print "Uploaded", text_key, "into cluster with", rows, "rows and", len(cols), "cols" def _upload_raw_data(self, tmp_file_path, column_names): # file upload info is the normalized path to a local file fui = {"file": os.path.abspath(tmp_file_path)} # create a random name for the data dest_key = H2OFrame.py_tmp_key() # do the POST -- blocking, and "fast" (does not real data upload) H2OConnection.post_json("PostFile", fui, destination_frame=dest_key) # actually parse the data and setup self._vecs self._handle_text_key(dest_key, column_names) def __iter__(self): """ Allows for list comprehensions over an H2OFrame :return: An iterator over the H2OFrame """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") return (vec for vec in self._vecs.__iter__() if vec is not None)
[docs] def vecs(self): """ Retrieve the array of H2OVec objects comprising this H2OFrame. :return: The array of H2OVec objects. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") return self._vecs
[docs] def keys(self): """ Retrieve the keys for each of the H2OVec objects comrpising this H2OFrame. :return: the array of keys. """ return [i.key() for i in self._vecs]
[docs] def col_names(self): """ Retrieve the column names (one name per H2OVec) for this H2OFrame. :return: A character list[] of column names. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") return [i._name for i in self._vecs]
[docs] def names(self): """ Retrieve the column names (one name per H2OVec) for this H2OFrame. :return: A character list[] of column names. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") return self.col_names()
[docs] def nrow(self): """ Get the number of rows in this H2OFrame. :return: The number of rows in this dataset. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") return len(self._vecs[0])
[docs] def ncol(self): """ Get the number of columns in this H2OFrame. :return: The number of columns in this H2OFrame. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") return len(self)
[docs] def filterNACols(self, frac=0.2): """ Filter columns with prportion of NAs >= frac. :param frac: Fraction of NAs in the column. :return: A list of column indices. """ fr = self.send_frame() res = h2o.rapids("(filterNACols %{} #{})".format(fr,str(frac))) l = res["head"][0] h2o.removeFrameShallow(fr) return [int(float(i)) for i in l]
[docs] def dim(self): """ Get the number of rows and columns in the H2OFrame. :return: The number of rows and columns in the H2OFrame as a list [rows, cols]. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") return [self.nrow(), self.ncol()] # Print [col, cols...]
[docs] def show(self): if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") else: if len(self) == 1: to_show = [[v] for v in self._vecs[0].show(noprint=True)] h2o.H2ODisplay(to_show,self.names()) else: vecs = [vec.show(noprint=True) for vec in self] # vecs = self._vecs l=1 if isinstance(vecs[0], float): vecs.insert(0,1) print "Displaying " + str(l) + " row(s):" vecs = [[v] for v in vecs] h2o.H2ODisplay(zip(*vecs),["Row ID"]+self.names()) else: l = len(vecs[0]) vecs.insert(0, range(1, len(vecs[0])+1, 1)) print "Displaying " + str(l) + " row(s):" h2o.H2ODisplay(zip(*vecs),["Row ID"]+self.names())
[docs] def head(self, rows=10, cols=200, **kwargs): """ Analgous to R's `head` call on a data.frame. Display a digestible chunk of the H2OFrame starting from the beginning. :param rows: Number of rows to display. :param cols: Number of columns to display. :param kwargs: Extra arguments passed from other methods. :return: None """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") nrows = min(self.nrow(), rows) ncols = min(self.ncol(), cols) colnames = self.names()[0:ncols] fr = H2OFrame.py_tmp_key() cbind = "(, (gput " + fr + " (cbind %FALSE %" cbind += " %".join([vec._expr.eager() for vec in self]) + ")))" res = h2o.rapids(cbind) h2o.removeFrameShallow(fr) head_rows = [range(1, nrows + 1, 1)] head_rows += [rows[0:nrows] for rows in res["head"][0:ncols]] head = zip(*head_rows) print "First", str(nrows), "rows and first", str(ncols), "columns: " h2o.H2ODisplay(head,["Row ID"]+self.names())
[docs] def tail(self, rows=10, cols=200, **kwargs): """ Analgous to R's `tail` call on a data.frame. Display a digestible chunk of the H2OFrame starting from the end. :param rows: Number of rows to display. :param cols: Number of columns to display. :param kwargs: Extra arguments passed from other methods. :return: None """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") nrows = min(self.nrow(), rows) ncols = min(self.ncol(), cols) colnames = self.names()[0:ncols] vecs = [self[c][(self.nrow()-nrows):(self.nrow())] for c in range(ncols)] print "Last", str(nrows), "rows and first", str(ncols), "columns: " if nrows != 1: fr = H2OFrame.py_tmp_key() cbind = "(, (gput " + fr + " (cbind %FALSE %" cbind += " %".join([vec._expr.eager() for vec in vecs]) + ")))" res = h2o.rapids(cbind) h2o.removeFrameShallow(fr) tail_rows = [range(self.nrow()-nrows+1, self.nrow() + 1, 1)] tail_rows += [rows[0:nrows] for rows in res["head"][0:ncols]] tail = zip(*tail_rows) h2o.H2ODisplay(tail,["Row ID"]+self.names()) else: h2o.H2ODisplay([[self.nrow()] + [expr.eager() for expr in exprs]], ["Row ID"] + colnames)
[docs] def levels(self, col=0): """ Get the factor levels for this frame and the specified column index. :param col: A column index in this H2OFrame. :return: a list of strings that are the factor levels for the column. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") if col < 0: col = 0 if col >= self.ncol(): col = self.ncol() - 1 vec = self._vecs[col] res = H2OConnection.get_json("Frames/{}/columns/{}/domain".format(vec._expr.eager(), "C1")) return res["domain"][0]
[docs] def setNames(self,names): """ Change the column names to `names`. :param names: A list of strings equal to the number of columns in the H2OFrame. :return: None. Rename the column names in this H2OFrame. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") if not names or not isinstance(names,list): raise ValueError("names parameter must be a list of strings") if len(names) != self.ncol(): raise ValueError("names parameter must be a list of ncol names") for s in names: if not isinstance(s,str): raise ValueError("all names in names parameter must be strings") for name, vec in zip(names,self._vecs): vec._name = name
[docs] def describe(self): """ Generate an in-depth description of this H2OFrame. The description is a tabular print of the type, min, max, sigma, number of zeros, and number of missing elements for each H2OVec in this H2OFrame. :return: None (print to stdout) """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") thousands_sep = h2o.H2ODisplay.THOUSANDS print "Rows:", thousands_sep.format(len(self._vecs[0])), "Cols:", thousands_sep.format(len(self)) headers = [vec._name for vec in self._vecs] table = [ self._row('type', None), self._row('mins', 0), self._row('mean', None), self._row('maxs', 0), self._row('sigma', None), self._row('zero_count', None), self._row('missing_count', None) ] chunk_summary_tmp_key = H2OFrame.send_frame(self) chunk_dist_sum = h2o.frame(chunk_summary_tmp_key)["frames"][0] dist_summary = chunk_dist_sum["distribution_summary"] chunk_summary = chunk_dist_sum["chunk_summary"] h2o.removeFrameShallow(chunk_summary_tmp_key) chunk_summary.show() dist_summary.show() h2o.H2ODisplay(table, [""] + headers, "Column-by-Column Summary") # def __repr__(self): # if self._vecs is None or self._vecs == []: # raise ValueError("Frame Removed") # self.show() # return "" # Find a named H2OVec and return it. Error is name is missing
def _find(self,name): return self._vecs[self._find_idx(name)]; # Find a named H2OVec and return the zero-based index for it. Error is name is missing def _find_idx(self,name): for i,v in enumerate(self._vecs): if name == v._name: return i raise ValueError("Name " + name + " not in Frame") # Column selection via integer, string (name) returns a Vec # Column selection via slice returns a subset Frame # Multi-dimensional slicing via 2-tuple def __getitem__(self, i): """ Column selection via integer, string(name) Column selection via slice returns a subset of the H2OFrame :param i: An int, str, slice, H2OVec, or list/tuple :return: An H2OVec, an H2OFrame, or scalar depending on the input slice. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") if isinstance(i, int): return self._vecs[i] if isinstance(i, str): return self._find(i) # Slice; return a Frame not a Vec if isinstance(i, slice): return H2OFrame(vecs=self._vecs[i]) # Row selection from a boolean Vec if isinstance(i, H2OVec): self._len_check(i) return H2OFrame(vecs=[x.row_select(i) for x in self._vecs]) # have a list/tuple of numbers or strings if isinstance(i, list) or (isinstance(i, tuple) and len(i) != 2): vecs = [] for it in i: if isinstance(it, int): vecs.append(self._vecs[it]) elif isinstance(it, str): vecs.append(self._find(it)) else: raise NotImplementedError return H2OFrame(vecs=vecs) # multi-dimensional slicing via 2-tuple if isinstance(i, tuple) and len(i)==2: res = self[i[1]] # Slice by columns eagerly # Now slice by rows if isinstance(res,H2OFrame): if isinstance(i[0], slice): return H2OFrame(vecs =[vec[i[0]] for vec in res._vecs]) elif isinstance(i[0], int) : return H2OFrame(python_obj=[vec[i[0]] for vec in res._vecs]) else: raise NotImplementedError if isinstance(res,H2OVec): return res[i[0]] raise NotImplementedError raise NotImplementedError("Slicing by unknown type: "+str(type(i))) def __setitem__(self, b, c): """ Replace a column in an H2OFrame. :param b: A 0-based index or a column name. :param c: The vector that 'b' is replaced with. :return: Returns this H2OFrame. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") # b is a named column, fish out the H2OVec and its index ncols = len(self._vecs) if isinstance(b, str): for i in xrange(ncols): if b == self._vecs[i]._name: break else: i = ncols # Not found, so append at end # b is a 0-based column index elif isinstance(b, int): if b < 0 or b > self.__len__(): raise ValueError("Index out of range: 0 <= " + b + " < " + self.__len__()) i = b b = self._vecs[i]._name else: raise NotImplementedError self._len_check(c) # R-like behavior: the column name remains the same, even if replacing via index c._name = b if i >= ncols: self._vecs.append(c) else: self._vecs[i] = c # Modifies the collection in-place to remove a named item def __delitem__(self, i): """ Remove a vec specified at the index i. :param i: The index of the vec to delete. :return: The Vec to be deleted. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") if isinstance(i, str): return self._vecs.pop(self._find_idx(i)) raise NotImplementedError # Makes a new collection
[docs] def drop(self, i): """ Column selection via integer, string(name) returns a Vec Column selection via slice returns a subset Frame :param i: Column to select :return: Returns an H2OVec or H2OFrame. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") # i is a named column if isinstance(i, str): for v in self._vecs: if i == v._name: return H2OFrame(vecs=[v for v in self._vecs if i != v._name]) raise ValueError("Name " + i + " not in Frame") # i is a 0-based column elif isinstance(i, int): if i < 0 or i >= self.__len__(): raise ValueError("Index out of range: 0 <= " + str(i) + " < " + str(self.__len__())) return H2OFrame(vecs=[v for v in self._vecs if v._name != self._vecs[i]._name]) raise NotImplementedError
def __len__(self): """ :return: Number of columns in this H2OFrame """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") return len(self._vecs)
[docs] def logical_negation(self): return H2OFrame(vecs=[H2OVec(x._name, Expr("not", x)) for x in self._vecs])
def _simple_frames_bin_op(self, data, op): if len(self) == 0: return self if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") self._len_check(data) if isinstance(data, H2OFrame): return H2OFrame(vecs=[H2OVec(x._name, Expr(op, x._len_check( y ), y )) for x, y in zip(self._vecs, data._vecs)]) if isinstance(data, H2OVec): return H2OFrame(vecs=[H2OVec(x._name, Expr(op, x._len_check(data), data)) for x in self._vecs ]) if isinstance(data, (int,float,str)): return H2OFrame(vecs=[H2OVec(x._name, Expr(op, x, Expr(data))) for x in self._vecs ]) raise NotImplementedError def _simple_frames_bin_rop(self, data, op): if len(self) == 0: return self if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") self._len_check(data) if isinstance(data, H2OFrame): return H2OFrame(vecs=[H2OVec(x._name, Expr(op, y , x._len_check( y ))) for x, y in zip(self._vecs, data._vecs)]) if isinstance(data, H2OVec): return H2OFrame(vecs=[H2OVec(x._name, Expr(op, data, x._len_check(data))) for x in self._vecs ]) if isinstance(data, (int,float,str)): return H2OFrame(vecs=[H2OVec(x._name, Expr(op, Expr(data),x,length=len(x))) for x in self._vecs ]) raise NotImplementedError # ops def __add__(self, i): return self._simple_frames_bin_op(i, "+") def __and__(self, i): return self._simple_frames_bin_op(i, "&") def __gt__ (self, i): return self._simple_frames_bin_op(i, "g") def __sub__(self, i): return self._simple_frames_bin_op(i,"-" ) def __or__ (self, i): return self._simple_frames_bin_op(i,"|" ) def __div__(self, i): return self._simple_frames_bin_op(i,"/" ) def __mul__(self, i): return self._simple_frames_bin_op(i,"*" ) def __eq__ (self, i): return self._simple_frames_bin_op(i,"n") def __ne__ (self, i): return self._simple_frames_bin_op(i,"N") def __pow__(self, i): return self._simple_frames_bin_op(i,"^" ) def __ge__ (self, i): return self._simple_frames_bin_op(i,"G") def __le__ (self, i): return self._simple_frames_bin_op(i,"L") def __lt__ (self, i): return self._simple_frames_bin_op(i,"l" ) # rops def __radd__(self, i): return self.__add__(i) def __rsub__(self, i): return self._simple_frames_bin_rop(i,"-") def __rand__(self, i): return self.__and__(i) def __ror__ (self, i): return self.__or__ (i) def __rdiv__(self, i): return self._simple_frames_bin_rop(i,"/") def __rmul__(self, i): return self.__mul__(i) def __rpow__(self, i): return self._simple_frames_bin_rop(i,"^") # unops def __abs__ (self): return h2o.abs(self) @staticmethod
[docs] def py_tmp_key(): """ :return: a unique h2o key obvious from python """ return unicode("py" + str(uuid.uuid4())) # Send over a frame description to H2O
[docs] def send_frame(self): """ Send a frame description to H2O, returns a key. :return: A key """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") # Send over the frame fr = H2OFrame.py_tmp_key() rapids_call = "(, " # fold into a single rapids call cbind = "(gput " + fr + " (cbind %FALSE '" # false flag means no deep copy! cbind += "' '".join([vec._expr.eager() for vec in self._vecs]) + "')) " rapids_call += cbind # h2o.rapids(cbind) # And frame columns colnames = "(colnames= %" + fr + " (: #0 #" + str(len(self) - 1) + ") " cnames = "(slist \"" + '" "'.join([vec._name for vec in self._vecs]) +"\")" colnames += cnames rapids_call += colnames h2o.rapids(rapids_call) return fr
def _row(self, field, idx): l = [field] for vec in self._vecs: tmp = vec.summary()[field] l.append(tmp[idx] if idx is not None and tmp is not None else tmp) return l # private static methods @staticmethod def _gen_header(cols): return ["C" + str(c) for c in range(1, cols + 1, 1)] @staticmethod def _check_lists_of_lists(python_obj): # all items in the list must be a list too lol_all = all(isinstance(l, (tuple, list)) for l in python_obj) # All items in the list must be a list! if not lol_all: raise ValueError("`python_obj` is a mixture of nested lists and other types.") # in fact, we must have a list of flat lists! for l in python_obj: if any(isinstance(ll, (tuple, list)) for ll in l): raise ValueError("`python_obj` is not a list of flat lists!") @staticmethod def _handle_python_lists(python_obj): cols = len(python_obj) # cols will be len(python_obj) if not a list of lists # do we have a list of lists: [[...], ..., [...]] ? lol = H2OFrame._is_list_of_lists(python_obj) if lol: # must be a list of flat lists, raise ValueError if not H2OFrame._check_lists_of_lists(python_obj) # have list of lists, each list is a row # length of the longest list is the number of columns cols = max([len(l) for l in python_obj]) # create the header header = H2OFrame._gen_header(cols) # shape up the data for csv.DictWriter data_to_write = [dict(zip(header, row)) for row in python_obj] if lol else [dict(zip(header, python_obj))] return header, data_to_write @staticmethod def _is_list_of_lists(o): return any(isinstance(l, (list, tuple)) for l in o) @staticmethod def _handle_python_dicts(python_obj): header = python_obj.keys() # is this a valid header? is_valid = all([re.match(r'^[a-zA-Z_][a-zA-Z0-9_.]*$', col) for col in header]) if not is_valid: raise ValueError("Did not get a valid set of column names! Must match the regular expression: ^[a-zA-Z_][a-zA-Z0-9_.]*$ ") # check that each value entry is a flat list/tuple for k in python_obj: v = python_obj[k] # if value is a tuple/list, then it must be flat if isinstance(v, (tuple, list)): if H2OFrame._is_list_of_lists(v): raise ValueError("Values in the dictionary must be flattened!") rows = map(list, itertools.izip_longest(*python_obj.values())) data_to_write = [dict(zip(header, row)) for row in rows] return header, data_to_write # @staticmethod # def _handle_numpy_array(python_obj): # header = H2OFrame._gen_header(python_obj.shape[1]) # # as_list = python_obj.tolist() # lol = H2OFrame._is_list_of_lists(as_list) # data_to_write = [dict(zip(header, row)) for row in as_list] \ # if lol else [dict(zip(header, as_list))] # # return header, data_to_write def _len_check(self,x): if len(self) == 0: return return self._vecs[0]._len_check(x) # Quantiles
[docs] def quantile(self, prob=None, combine_method="interpolate"): """ Compute quantiles over a given H2OFrame. :param prob: A list of probabilties, default is [0.01,0.1,0.25,0.333,0.5,0.667,0.75,0.9,0.99]. You may provide any sequence of any length. :param combine_method: For even samples, how to combine quantiles. Should be one of ["interpolate", "average", "low", "hi"] :return: an H2OFrame containing the quantiles and probabilities. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") if len(self) == 0: return self if not prob: prob=[0.01,0.1,0.25,0.333,0.5,0.667,0.75,0.9,0.99] if not isinstance(prob, list): raise ValueError("prob must be a list") probs = "(dlist #"+" #".join([str(p) for p in prob])+")" if combine_method not in ["interpolate","average","low","high"]: raise ValueError("combine_method must be one of: [" + ",".join(["interpolate","average","low","high"])+"]") key = self.send_frame() tmp_key = H2OFrame.py_tmp_key() expr = "(= !{} (quantile '{}' {} '{}'".format(tmp_key,key,probs,combine_method) h2o.rapids(expr) # Remove h2o temp frame after groupby h2o.removeFrameShallow(key) # Make backing H2OVecs for the remote h2o vecs j = h2o.frame(tmp_key) fr = j['frames'][0] # Just the first (only) frame rows = fr['rows'] # Row count veckeys = fr['vec_ids'] # List of h2o vec keys cols = fr['columns'] # List of columns colnames = [col['label'] for col in cols] vecs=H2OVec.new_vecs(zip(colnames, veckeys), rows) # Peel the Vecs out of the returned Frame h2o.removeFrameShallow(tmp_key) return H2OFrame(vecs=vecs) # H2OFrame Mutating cbind
[docs] def cbind(self,data): """ :param data: H2OFrame or H2OVec to cbind to self :return: void """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") if isinstance(data, H2OFrame): num_vecs = len(data._vecs) for vidx in range(num_vecs): self._vecs.append(data._vecs[vidx]) elif isinstance(data, H2OVec): self._vecs.append(data) else: raise ValueError("data to cbind must be H2OVec or H2OFrame") # ddply in h2o
[docs] def ddply(self,cols,fun): """ :param cols: Column names used to control grouping :param fun: Function to execute on each group. Right now limited to textual Rapids expression :return: New frame with 1 row per-group, of results from 'fun' """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") # Confirm all names present in dataset; collect column indices rapids_series = "(llist #"+" #".join([str(self._find_idx(name)) for name in cols])+")" # Eagerly eval and send the cbind'd frame over key = self.send_frame() tmp_key = H2OFrame.py_tmp_key() expr = "(= !{} (h2o.ddply %{} {} {}))".format(tmp_key,key,rapids_series,fun) h2o.rapids(expr) # ddply in h2o # Remove h2o temp frame after ddply h2o.removeFrameShallow(key) # Make backing H2OVecs for the remote h2o vecs j = h2o.frame(tmp_key) # Fetch the frame as JSON fr = j['frames'][0] # Just the first (only) frame rows = fr['rows'] # Row count veckeys = fr['vec_ids']# List of h2o vec keys cols = fr['columns'] # List of columns colnames = [col['label'] for col in cols] vecs=H2OVec.new_vecs(zip(colnames, veckeys), rows) # Peel the Vecs out of the returned Frame h2o.removeFrameShallow(tmp_key) return H2OFrame(vecs=vecs)
[docs] def group_by(self,cols,a): """ GroupBy :param cols: The columns to group on. :param a: A dictionary of aggregates having the following shape: \ {"colname":[aggregate, column, naMethod]}\ e.g.: {"bikes":["count", 0, "all"]}\ The naMethod is one of "all", "ignore", or "rm", which specifies how to handle NAs that appear in columns that are being aggregated. "all" - include NAs "rm" - exclude NAs "ignore" - ignore NAs in aggregates, but count them (e.g. in denominators for mean, var, sd, etc.) :return: The group by frame. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") rapids_series = "(llist #"+" #".join([str(self._find_idx(name)) for name in cols])+")" aggregates = copy.deepcopy(a) key = self.send_frame() tmp_key = H2OFrame.py_tmp_key() aggs = [] # transform cols in aggregates to their indices... for k in aggregates: if isinstance(aggregates[k][1],str): aggregates[k][1] = '#'+str(self._find_idx(aggregates[k][1])) else: aggregates[k][1] = '#'+str(aggregates[k][1]) aggs+=["\"{1}\" {2} \"{3}\" \"{0}\"".format(str(k),*aggregates[k])] aggs = "(agg {})".format(" ".join(aggs)) expr = "(= !{} (GB %{} {} {}))".format(tmp_key,key,rapids_series,aggs) h2o.rapids(expr) # group by # Remove h2o temp frame after groupby h2o.removeFrameShallow(key) # Make backing H2OVecs for the remote h2o vecs j = h2o.frame(tmp_key) fr = j['frames'][0] # Just the first (only) frame rows = fr['rows'] # Row count veckeys = fr['vec_ids'] # List of h2o vec keys cols = fr['columns'] # List of columns colnames = [col['label'] for col in cols] vecs=H2OVec.new_vecs(zip(colnames, veckeys), rows) # Peel the Vecs out of the returned Frame h2o.removeFrameShallow(tmp_key) return H2OFrame(vecs=vecs)
[docs] def impute(self,column,method,combine_method,by,inplace): """ Impute a column in this H2OFrame. :param column: The column to impute :param method: How to compute the imputation value. :param combine_method: For even samples and method="median", how to combine quantiles. :param by: Columns to group-by for computing imputation value per groups of columns. :param inplace: Impute inplace? :return: the imputed frame. """ # sanity check columns, get the column index col_id = -1 if isinstance(column, list): column = column[0] # only take the first one ever... if isinstance(column, (unicode,str)): col_id = self._find_idx(column) elif isinstance(column, int): col_id = column elif isinstance(column, H2OVec): try: col_id = [a._name==v._name for a in self].index(True) except: raise ValueError("No column found to impute.") # setup the defaults, "mean" for numeric, "mode" for enum if isinstance(method, list) and len(method) > 1: if self[col_id].isfactor(): method="mode" else: method="mean" elif isinstance(method, list):method=method[0] # choose "interpolate" by default for combine_method if isinstance(combine_method, list) and len(combine_method) > 1: combine_method="interpolate" if combine_method == "lo": combine_method = "low" if combine_method == "hi": combine_method = "high" # sanity check method if method=="median": # no by and median! if by is not None: raise ValueError("Unimplemented: No `by` and `median`. Please select a different method (e.g. `mean`).") # method cannot be median or mean for factor columns if self[col_id].isfactor() and method not in ["ffill", "bfill", "mode"]: raise ValueError("Column is categorical, method must not be mean or median.") # setup the group by columns gb_cols = "()" if by is not None: if not isinstance(by, list): by = [by] # just make it into a list... if isinstance(by[0], (unicode,str)): by = [self._find_idx(name) for name in by] elif isinstance(by[0], int): by = by elif isinstance(by[0], H2OVec): by = [[a._name==v._name for a in self].index(True) for v in by] # nested list comp. WOWZA else: raise ValueError("`by` is not a supported type") if by is not None: gb_cols = "(llist #"+" #".join([str(b) for b in by])+")" key = self.send_frame() tmp_key = H2OFrame.py_tmp_key() if inplace: # frame, column, method, combine_method, gb_cols, inplace expr = "(h2o.impute %{} #{} \"{}\" \"{}\" {} %TRUE".format(key, col_id, method, combine_method, gb_cols) h2o.rapids(expr) # exec the thing h2o.removeFrameShallow(key) # "soft" delete of the frame key, keeps vecs live return self else: expr = "(= !{} (h2o.impute %{} #{} \"{}\" \"{}\" {} %FALSE))".format(tmp_key,key,col_id,method,combine_method,gb_cols) h2o.rapids(expr) # exec the thing h2o.removeFrameShallow(key) # Make backing H2OVecs for the remote h2o vecs j = h2o.frame(tmp_key) fr = j['frames'][0] # Just the first (only) frame rows = fr['rows'] # Row count veckeys = fr['vec_ids'] # List of h2o vec keys cols = fr['columns'] # List of columns colnames = [col['label'] for col in cols] vecs = H2OVec.new_vecs(zip(colnames, veckeys), rows) # Peel the Vecs out of the returned Frame h2o.removeFrameShallow(tmp_key) # soft delete the new Frame, keep the imputed Vecs alive return H2OFrame(vecs=vecs)
[docs] def merge(self, other, allLeft=False, allRite=False): """ Merge two datasets based on common column names :param other: Other dataset to merge. Must have at least one column in common with self, and all columns in common are used as the merge key. If you want to use only a subset of the columns in common, rename the other columns so the columns are unique in the merged result. :param allLeft: If true, include all rows from the left/self frame :param allRite: If true, include all rows from the right/other frame :return: Original self frame enhanced with merged columns and rows """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") for v0 in self._vecs: for v1 in other._vecs: if v0._name==v1._name: break if v0._name==v1._name: break else: raise ValueError("frames must have some columns in common to merge on") # Eagerly eval and send the cbind'd frame over lkey = self .send_frame() rkey = other.send_frame() tmp_key = H2OFrame.py_tmp_key() expr = "(= !{} (merge %{} %{} %{} %{}))".format(tmp_key,lkey,rkey, "TRUE" if allLeft else "FALSE", "TRUE" if allRite else "FALSE") # Remove h2o temp frame after merge expr2 = "(, "+expr+" (del %"+lkey+" #0) (del %"+rkey+" #0) )" h2o.rapids(expr2) # merge in h2o # Make backing H2OVecs for the remote h2o vecs j = h2o.frame(tmp_key) # Fetch the frame as JSON fr = j['frames'][0] # Just the first (only) frame rows = fr['rows'] # Row count veckeys = fr['vec_ids']# List of h2o vec keys cols = fr['columns'] # List of columns colnames = [col['label'] for col in cols] vecs=H2OVec.new_vecs(zip(colnames, veckeys), rows) # Peel the Vecs out of the returned Frame h2o.removeFrameShallow(tmp_key) return H2OFrame(vecs=vecs) # generic reducers (min, max, sum, var)
[docs] def min(self): """ :return: The minimum value of all frame entries """ return min([vec.min() for vec in self._vecs])
[docs] def max(self): """ :return: The maximum value of all frame entries """ return max([vec.max() for vec in self._vecs])
[docs] def sum(self): """ :return: The sum of all frame entries """ return sum([vec.sum() for vec in self._vecs])
[docs] def var(self): """ :return: The covariance matrix of the columns in this H2OFrame. """ if self._vecs is None or self._vecs == []: raise ValueError("Frame Removed") key = self.send_frame() tmp_key = H2OFrame.py_tmp_key() expr = "(= !{} (var %{} () %FALSE \"everything\"))".format(tmp_key,key) h2o.rapids(expr) # Remove h2o temp frame after var h2o.removeFrameShallow(key) j = h2o.frame(tmp_key) fr = j['frames'][0] rows = fr['rows'] veckeys = fr['vec_ids'] cols = fr['columns'] colnames = [col['label'] for col in cols] vecs=H2OVec.new_vecs(zip(colnames, veckeys), rows) # Peel the Vecs out of the returned Frame h2o.removeFrameShallow(tmp_key) return H2OFrame(vecs=vecs)
[docs]class H2OVec: """ A single column of data that is uniformly typed and possibly lazily computed. """ def __init__(self, name, expr): """ Create a new instance of an H2OVec object :param name: The name of the column corresponding to this H2OVec. :param expr: The lazy expression representing this H2OVec :return: A new H2OVec """ assert isinstance(name, str) assert isinstance(expr, Expr) self._name = name # String self._expr = expr # Always an expr expr._name = name # Pass name along to expr @staticmethod
[docs] def new_vecs(vecs=None, rows=-1): if not vecs: return vecs return [H2OVec(str(col), Expr(op=veckey['name'], length=rows)) for idx, (col, veckey) in enumerate(vecs)]
[docs] def name(self): """ :return: Return the column name for this H2OVec """ return self._name
[docs] def key(self): """ :return: Return the H2O Key for this Vec. """ return self._expr._data if isinstance(self._expr._data, (unicode, str)) else ""
[docs] def setName(self,name): """ Set the column name for this column. :param name: The new name for this column. :return: None """ if name and isinstance(name,str): self._name = name else: raise ValueError("name parameter must be a string")
[docs] def get_expr(self): """ Helper method to obtain the expr object in self. Can also get it directly @ ._expr. :return: the _expr member of this H2OVec """ return self._expr
[docs] def append(self, data): """ Append a value during CSV read, convert to float. :param data: An element being appended to the end of this H2OVec :return: void """ __x__ = data try: __x__ = float(data) except ValueError: pass self._expr.data().append(__x__) self._expr.set_len(self._expr.get_len() + 1) # H2OVec non-mutating cbind
[docs] def cbind(self,data): """ :param data: H2OFrame or H2OVec :return: new H2OFrame with data cbinded to the end """ # Check data type vecs = [] if isinstance(data,H2OFrame): vecs.append(self) [vecs.append(vec) for vec in data._vecs] elif isinstance(data,H2OVec): vecs = [self, data] else: raise ValueError("data parameter must be H2OVec or H2OFrame") names = [vec.name() for vec in vecs] fr = H2OFrame.py_tmp_key() cbind = "(= !" + fr + " (cbind %FALSE %" cbind += " %".join([vec._expr.eager() for vec in vecs]) + "))" h2o.rapids(cbind) j = h2o.frame(fr) fr = j['frames'][0] rows = fr['rows'] veckeys = fr['vec_ids'] cols = fr['columns'] colnames = [col['label'] for col in cols] result = H2OFrame(vecs=H2OVec.new_vecs(zip(colnames, veckeys), rows)) result.setNames(names) return result
[docs] def show(self, noprint=False): """ Pretty print this H2OVec, or return values up to an iterator on an enclosing Frame :param noprint: A boolean stating whether to print or to return data. :return: If noprint is False, then self._expr is returned. """ if noprint: return self._expr.show(noprint=True) else: to_show = [[v] for v in self._expr.show(noprint=True)] nrows = min(11, len(to_show) + 1) - 1 for i in range(1, min(11, len(to_show) + 1), 1): to_show[i - 1].insert(0, i) header = self._name + " (first " + str(nrows) + " row(s))" header=["Row ID", header] h2o.H2ODisplay(to_show, header) # def __repr__(self): # self.show() # return ""
[docs] def summary(self): """ Compute the rollup data summary (min, max, mean, etc.) :return: the summary from this Expr object """ return self._expr.summary()
def __getitem__(self, i): """ Basic index/sliced lookup :param i: An Expr or an H2OVec :return: A new Expr object corresponding to the input query """ if isinstance(i, H2OVec): return self.row_select(i) if isinstance(i, int): return Expr("[", self, Expr(i), length=1).eager() # Single row select, makes a scalar if isinstance(i, slice): e = Expr(i) return H2OVec(self._name,Expr("[", self, e, length=len(e))) raise ValueError("Row selection from a Vec is limited to 1 row, or a boolean Vec") # Boolean column select lookup. Eager, to compute the result length
[docs] def row_select(self, vec): """ Boolean column select lookup :param vec: An H2OVec. :return: A new H2OVec. """ e = Expr("[", self, vec) r = e.eager() if isinstance(r, (float,int)): e.set_len(1) else: j = h2o.frame(r) e.set_len(j['frames'][0]['rows']) return H2OVec(self._name, e)
def __setitem__(self, b, c): """ Update-in-place of a Vec. This interface currently only supports whole vector replacement. If `c` has length 1, then it's assumed that `c` represents a constant vector of its current value. :param b: An H2OVec for selecting rows to update in-place. :param c: The "new" values that will write over the values stipulated by `b`. :return: void """ self._len_check(c) # row-wise assignment if isinstance(b, H2OVec): # whole vec replacement self._len_check(b) # lazy update in-place of the whole vec self._expr = Expr("=", Expr("[", self._expr, b), None if c is None else Expr(c)) else: raise NotImplementedError("Only vector replacement is currently supported.") # Simple boolean operators, which auto-expand a right scalar argument def _simple_vec_bin_op( self, i, op): if isinstance(i, H2OFrame ): return i._simple_frames_bin_op(H2OFrame(vecs=[self]),op) if isinstance(i, H2OVec ): return H2OVec(self._name, Expr(op, self._len_check(i), i)) if isinstance(i, (int, float)): return H2OVec(self._name, Expr(op, self, Expr(i))) if isinstance(i, Expr) : return H2OVec(self._name, Expr(op, self, i)) if isinstance(i, str) : return H2OVec(self._name, Expr(op, self, Expr(None,i))) if op == "n" and i is None : return H2OVec(self._name, Expr("is.na", self._expr, None)) raise NotImplementedError def _simple_vec_bin_rop(self, i, op): if isinstance(i, (int, float)): return H2OVec(self._name, Expr(op, Expr(i), self, length=len(self))) if isinstance(i, Expr) : return H2OVec(self._name, Expr(op, i, self, length=len(self))) raise NotImplementedError
[docs] def logical_negation(self): return H2OVec(self._name, Expr("not", self))
def __add__(self, i): return self._simple_vec_bin_op(i,"+" ) def __sub__(self, i): return self._simple_vec_bin_op(i,"-" ) def __and__(self, i): return self._simple_vec_bin_op(i,"&" ) def __or__ (self, i): return self._simple_vec_bin_op(i,"|" ) def __div__(self, i): return self._simple_vec_bin_op(i,"/" ) def __mul__(self, i): return self._simple_vec_bin_op(i,"*" ) def __eq__ (self, i): return self._simple_vec_bin_op(i,"n") def __ne__ (self, i): return self._simple_vec_bin_op(i,"N") def __pow__(self, i): return self._simple_vec_bin_op(i,"^" ) def __ge__ (self, i): return self._simple_vec_bin_op(i,"G") def __gt__ (self, i): return self._simple_vec_bin_op(i,"g" ) def __le__ (self, i): return self._simple_vec_bin_op(i,"L") def __lt__ (self, i): return self._simple_vec_bin_op(i,"l" ) def __radd__(self, i): return self.__add__(i) # commutativity def __rsub__(self, i): return self._simple_vec_bin_rop(i,"-") # not commutative def __rand__(self, i): return self.__and__(i) # commutativity (no short circuiting) def __ror__ (self, i): return self.__or__ (i) def __rdiv__(self, i): return self._simple_vec_bin_rop(i,"/") # not commutative def __rmul__(self, i): return self.__mul__(i) def __rpow__(self, i): return self._simple_vec_bin_rop(i,"^") # not commutative def __abs__ (self): return h2o.abs(self) def __len__(self): """ :return: The length of this H2OVec """ return len(self._expr)
[docs] def dim(self): """ :return: The length of the H2OVec """ return len(self), 1
[docs] def floor(self): """ :return: A lazy Expr representing the Math.floor() of this H2OVec. """ return H2OVec(self._name,Expr("floor", self._expr, None)) # generic reducers (min, max, sum, sd, var, mean, median)
[docs] def min(self): """ :return: Min value of the H2OVec elements. """ return Expr("min", self._expr).eager()
[docs] def max(self): """ :return: Max value of the H2OVec elements. """ return Expr("max", self._expr).eager()
[docs] def sum(self): """ :return: Sum of the H2OVec elements. """ return Expr("sum", self._expr).eager()
[docs] def sd(self): """ :return: Standard deviation of the H2OVec elements. """ return Expr("sd", self._expr).eager()
[docs] def var(self): """ :return: A lazy Expr representing the variance of this H2OVec. """ return Expr("var", self._expr).eager()
[docs] def mean(self): """ :return: Mean of this H2OVec. """ return Expr("mean", self._expr).eager()
[docs] def median(self): """ :return: Median of this H2OVec. """ return Expr("median", self._expr).eager()
[docs] def quantile(self,prob=None,combine_method="interpolate"): """ :return: A lazy Expr representing the quantiles of this H2OVec. """ if not prob: prob=[0.01,0.1,0.25,0.333,0.5,0.667,0.75,0.9,0.99] return H2OFrame(vecs=[self]).quantile(prob,combine_method)
[docs] def asfactor(self): """ :return: A lazy Expr representing this vec converted to a factor """ return H2OVec(self._name, Expr("as.factor", self._expr, None))
[docs] def isfactor(self): """ :return: A lazy Expr representing the truth of whether or not this vec is a factor. """ return Expr("is.factor", self._expr, None, length=1).eager()
[docs] def isna(self): """ :return: Returns a new boolean H2OVec. """ return H2OVec("", Expr("is.na", self._expr, None))
[docs] def month(self): """ :return: Returns a new month column from a msec-since-Epoch column """ return H2OVec(self._name, Expr("month", self._expr, None))
[docs] def dayOfWeek(self): """ :return: Returns a new Day-of-Week column from a msec-since-Epoch column """ return H2OVec(self._name, Expr("dayOfWeek", self._expr, None))
[docs] def runif(self, seed=None): """ :param seed: A random seed. If None, then one will be generated. :return: A new H2OVec filled with doubles sampled uniformly from [0,1). """ if not seed: import random seed = random.randint(123456789, 999999999) # generate a seed return H2OVec("", Expr("h2o.runif", self._expr, Expr(seed))) # Error if lengths are not compatible. Return self for flow-coding
def _len_check(self,x): if not x: return self if isinstance(x,H2OFrame): x = x._vecs[0] if isinstance(x,Expr): raise ValueError("Mixing Vec and Expr") if not isinstance(x,H2OVec): return self if len(self) != len(x): raise ValueError("H2OVec length mismatch: "+str(len(self))+" vs "+str(len(x))) return self @staticmethod
[docs] def mktime(year=1970,month=0,day=0,hour=0,minute=0,second=0,msec=0): """ All units are zero-based (including months and days). Missing year is 1970. :return: Returns msec since the Epoch. """ # Some error checking on length xlen = -1 for x in [msec,second,minute,hour,day,month,year]: if not isinstance(x,int): l2 = len(x) if xlen != l2: if xlen == -1: xlen = l2 else: raise ValueError("length of "+str(xlen)+" not compatible with "+str(l2)) e = None for x in [msec,second,minute,hour,day,month,year]: x = Expr(x) if isinstance(x,int) else x e = Expr(",", x, e) e2 = Expr("mktime",e,None,xlen) return e2 if xlen==1 else H2OVec("mktime",e2)