public abstract class MRTask<T extends MRTask<T>> extends DTask<T> implements ForkJoinPool.ManagedBlocker
MRTask provides several map
and reduce
methods
that can be overridden to specify a computation. Several instances of this
class will be created to distribute the computation over F/J threads and
machines. Non-transient fields are copied and serialized to instances
created for map invocations. Reduce methods can store their results in
fields. Results are serialized and reduced all the way back to the invoking
node. When the last reduce method has been called, fields of the initial
MRTask instance contains the computation results.
Apart from small reduced POJO returned to the calling node, MRTask can
produce output vector(s) as a result. These will have chunks co-located
with the input dataset, however, their number of lines will generally differ
so they won't be strictly compatible with the original. To produce output
vectors, call doAll.dfork version with required number of outputs and
override appropriate map
call taking required number of
NewChunks. MRTask will automatically close the new Appendable vecs and a
call to outputFrame
will make a frame with newly created Vecs.
Overview
Distributed computation starts by calling doAll
,
dfork
, or dfork
. doAll
simply
calls dfork
and dfork
before blocking;
dfork
and dfork
are non-blocking. The main
pardigm is divide-conquer-combine using ForkJoin.
If doAll
is called with Keys, then one map
call is
made per Key, on the Key's home node. If MRTask is invoked on a Frame (or
equivalently a Vec[]), then one map
call is made per Chunk for
all Vecs at once, on the Chunk's home node. In both modes,
reduce
is called between each pair of calls to
map
.
MRTask can also be called with doAllNodes
, in which case only
the setupLocal call is made once per node; neither map nor reduce are
called.
Computation is tailored primarily by overriding. The main method is the
map
call, coupled sometimes with a reduce
call.
setupLocal
is called once per node before any map calls are
made on that node (but perhaps other nodes have already started); in reverse
closeLocal
is called after the last map call completes on a
node (but perhaps other nodes are still computing maps).
postGlobal
is called once only after all maps, reduces and
closeLocals, and only on the home node.
Modifier and Type | Class and Description |
---|---|
static class |
MRTask.PostMapAction<T extends MRTask.PostMapAction<T>> |
DTask.DKeyTask<T extends DTask.DKeyTask,V extends Keyed>, DTask.RemoveCall
Modifier and Type | Field and Description |
---|---|
protected AppendableVec[] |
_appendables
Appendables are treated separately (roll-ups computed in map/reduce
style, can not be passed via K/V store).
|
Frame |
_fr
This Frame instance is the handle for computation over a set of Vec instances.
|
protected Futures |
_fs
We can add more things to block on - in case we want a bunch of lazy
tasks produced by children to all end before this top-level task ends.
|
protected int |
_hi
Internal field to track a range of local Chunks to work on
|
Key[] |
_keys
This
Key[] instance is the handle used for computation when
an MRTask is invoked over an array of Key instances. |
protected T |
_left
Internal field to track the left & right sub-range of chunks to work on
|
protected int |
_lo
Internal field to track a range of local Chunks to work on
|
protected short |
_nhi
The range of Nodes to work on remotely
|
protected RPC<T> |
_nleft
Internal field to track the left & right remote nodes/JVMs to work on
|
protected short |
_nlo
The range of Nodes to work on remotely
|
protected RPC<T> |
_nrite
Internal field to track the left & right remote nodes/JVMs to work on
|
protected T |
_rite
Internal field to track the left & right sub-range of chunks to work on
|
protected boolean |
_run_local
If true, run entirely local - which will pull all the data locally.
|
protected boolean |
_topLocal
Internal field to track if this is a top-level local call
|
Modifier | Constructor and Description |
---|---|
|
MRTask() |
protected |
MRTask(byte prior) |
protected |
MRTask(H2O.H2OCountedCompleter cmp) |
Modifier and Type | Method and Description |
---|---|
AppendableVec[] |
appendables()
New Output vectors; may be null.
|
void |
asyncExecOnAllNodes() |
boolean |
block()
Possibly blocks the current thread, for example waiting for
a lock or condition.
|
protected void |
closeLocal()
Override to do any remote cleaning on the last remote instance of
this object, for disposing of node-local shared data structures.
|
void |
compute2()
Called from FJ threads to do local work.
|
T |
dfork(byte[] outputTypes,
Frame fr,
boolean runLocal)
Fork the task in strictly non-blocking fashion.
|
T |
dfork(byte[] outputTypes,
Vec... vecs)
Invokes the map/reduce computation over the given Vec instances and produces
outputs Vec instances. |
T |
dfork(Frame fr)
Invokes the map/reduce computation over the given Frame instance.
|
void |
dfork(Key... keys) |
T |
dfork(Vec... vecs) |
void |
dinvoke(H2ONode sender)
Called once on remote at top level, probably with a subset of the cloud.
|
T |
doAll(byte[] outputTypes,
Frame fr)
Invokes the map/reduce computation over the given Frame.
|
T |
doAll(byte[] outputTypes,
Frame fr,
boolean runLocal)
Invokes the map/reduce computation over the given Frame.
|
T |
doAll(byte[] outputTypes,
Vec... vecs)
Invokes the map/reduce computation over the given
Vec s. |
T |
doAll(byte[] outputTypes,
Vec vec,
boolean runLocal)
Invokes the map/reduce computation over the given
Vec . |
T |
doAll(byte outputType,
Frame fr)
Invokes the map/reduce computation over the given Frame.
|
T |
doAll(byte outputType,
Vec... vecs)
Invokes the map/reduce computation over the given
Vec s. |
T |
doAll(Frame fr)
Invokes the map/reduce computation over the given Frame.
|
T |
doAll(Frame fr,
boolean runLocal)
Invokes the map/reduce computation over the given Frame.
|
T |
doAll(int numberOfOutputs,
byte outputType,
Frame fr)
Invokes the map/reduce computation over the given Frame.
|
T |
doAll(Key... keys) |
T |
doAll(Vec... vecs)
Invokes the map/reduce computation over the given
Vec s. |
T |
doAll(Vec vec,
boolean runLocal)
Invokes the map/reduce computation over the given
Vec . |
T |
doAllNodes() |
T |
getResult()
Block for and get any final results from a dfork'd MRTask.
|
T |
getResult(boolean fjManagedBlock)
Block for and get any final results from a dfork'd MRTask.
|
boolean |
isReleasable()
Returns
true if blocking is unnecessary. |
void |
map(Chunk c)
Override with your map implementation.
|
void |
map(Chunk[] cs)
Override with your map implementation.
|
void |
map(Chunk[] cs,
NewChunk nc)
The handy method to generate a new vector based on existing vectors.
|
void |
map(Chunk[] cs,
NewChunk[] ncs) |
void |
map(Chunk[] cs,
NewChunk nc1,
NewChunk nc2) |
void |
map(Chunk c0,
Chunk c1)
Override with your map implementation.
|
void |
map(Chunk c0,
Chunk c1,
Chunk c2)
Override with your map implementation.
|
void |
map(Chunk c0,
Chunk c1,
Chunk c2,
Chunk c3)
Override with your map implementation.
|
void |
map(Chunk c0,
Chunk c1,
NewChunk nc) |
void |
map(Chunk c,
NewChunk nc) |
void |
map(Chunk c0,
NewChunk nc0,
NewChunk nc1) |
void |
map(Key key)
Override with your map implementation.
|
protected boolean |
modifiesVolatileVecs() |
void |
onCompletion(CountedCompleter caller)
OnCompletion - reduce the left and right into self.
|
boolean |
onExceptionalCompletion(java.lang.Throwable ex,
CountedCompleter caller)
Cancel/kill all work as we can, then rethrow...
|
Frame |
outputFrame()
Get the resulting Frame from this invoked MRTask.
|
Frame |
outputFrame(Key<Frame> key,
java.lang.String[] names,
java.lang.String[][] domains)
Get the resulting Frame from this invoked MRTask.
|
Frame |
outputFrame(java.lang.String[] names,
java.lang.String[][] domains)
Get the resulting Frame from this invoked MRTask.
|
protected void |
postGlobal() |
T |
profile()
Used to invoke profiling.
|
java.lang.String |
profString() |
void |
reduce(T mrt)
Override to combine results from 'mrt' into 'this' MRTask.
|
protected T |
self() |
protected void |
setupLocal()
Override to do any remote initialization on the 1st remote instance of
this object, for initializing node-local shared data structures.
|
MRTask<T> |
withPostMapAction(MRTask.PostMapAction<?> postMap) |
copyOver, getDException, hasException, logVerbose, onAck, onAckAck, setException
asBytes, clone, compute, compute1, currThrPriority, frozenType, icer, priority, read, readJSON, reloadFromBytes, write, writeJSON
__tryComplete, addToPendingCount, compareAndSetPendingCount, complete, exec, getCompleter, getPendingCount, getRawResult, setCompleter, setPendingCount, setRawResult, tryComplete
adapt, adapt, adapt, cancel, compareAndSetForkJoinTaskTag, completeExceptionally, fork, get, get, get, getException, getForkJoinTaskTag, getPool, getQueuedTaskCount, getSurplusQueuedTaskCount, helpQuiesce, inForkJoinPool, invoke, invokeAll, invokeAll, invokeAll, isCancelled, isCompletedAbnormally, isCompletedNormally, isDone, join, peekNextLocalTask, pollNextLocalTask, pollTask, quietlyComplete, quietlyInvoke, quietlyJoin, reinitialize, setForkJoinTaskTag, tryUnfork
public Frame _fr
doAll
with Frame and Vec[] instances. Top-level calls to
doAll
wrap Vec instances into a new Frame instance and set this into
_fr
during a call to dfork
.public Key[] _keys
Key[]
instance is the handle used for computation when
an MRTask is invoked over an array of Key
instances.protected AppendableVec[] _appendables
protected transient RPC<T extends MRTask<T>> _nleft
protected transient RPC<T extends MRTask<T>> _nrite
protected transient boolean _topLocal
protected transient T extends MRTask<T> _left
protected transient T extends MRTask<T> _rite
protected short _nlo
protected short _nhi
protected transient int _lo
protected transient int _hi
protected transient Futures _fs
protected boolean _run_local
public MRTask()
protected MRTask(H2O.H2OCountedCompleter cmp)
protected MRTask(byte prior)
public AppendableVec[] appendables()
public final MRTask<T> withPostMapAction(MRTask.PostMapAction<?> postMap)
public java.lang.String profString()
public T profile()
new MRTask().profile().doAll();
public Frame outputFrame()
public Frame outputFrame(java.lang.String[] names, java.lang.String[][] domains)
names
- The names of the columns in the resulting Frame.domains
- The domains of the columns in the resulting Frame.public Frame outputFrame(Key<Frame> key, java.lang.String[] names, java.lang.String[][] domains)
key
is not null, then the resulting Frame will appear in the DKV. AppendableVec instances
are closed into Vec instances, which then appear in the DKV.key
- If null, then the Frame will not appear in the DKV. Otherwise, this result
will appear in the DKV under this key.names
- The names of the columns in the resulting Frame.domains
- The domains of the columns in the resulting Frame.public void map(Chunk c)
public void map(Chunk c0, Chunk c1)
public void map(Chunk c0, Chunk c1, Chunk c2)
public void map(Chunk c0, Chunk c1, Chunk c2, Chunk c3)
public void map(Chunk[] cs)
public void map(Chunk[] cs, NewChunk nc)
cs
- input vectorsnc
- output vectorpublic void map(Key key)
public void reduce(T mrt)
protected void setupLocal()
protected void closeLocal()
protected T self()
public final T doAll(Vec... vecs)
Vec
s. This call is
blocking.vecs
- Perform the computation over this vectors. They can be possibly mutated as side-effect.public final T doAll(byte[] outputTypes, Vec... vecs)
Vec
s. This call is
blocking.public final T doAll(byte outputType, Vec... vecs)
Vec
s. This call is
blocking.public final T doAll(Vec vec, boolean runLocal)
Vec
. This call is blocking.vec
- Perform the computation over this vector. It can be possibly mutated as side-effect.runLocal
- Run locally by copying data, or run across the cluster?public final T doAll(byte[] outputTypes, Vec vec, boolean runLocal)
Vec
. This call is blocking.outputTypes
- The type of output Vec instances to create. See Vec.T_STR
, Vec.T_NUM
,
Vec.T_CAT
and other byte constants in Vec
to see possible values.vec
- Perform the computation over this vector. It can be possibly mutated as side-effect.runLocal
- Run locally by copying data, or run across the cluster?public final T doAll(Frame fr, boolean runLocal)
fr
- Perform the computation on this Frame instance. This frame can be possibly mutated as side-effect.runLocal
- Run locally by copying data, or run across the cluster?public final T doAll(Frame fr)
fr
- Perform the computation on this Frame instance. This frame can be possibly mutated as side-effect.public final T doAll(byte[] outputTypes, Frame fr)
public final T doAll(byte outputType, Frame fr)
public final T doAll(byte[] outputTypes, Frame fr, boolean runLocal)
outputTypes
- The type of output Vec instances to create. See Vec.T_STR
, Vec.T_NUM
,
Vec.T_CAT
and other byte constants in Vec
to see possible values.fr
- Perform the computation on this Frame instance. This frame can be possibly mutated as side-effect.runLocal
- Run locally by copying data, or run across the cluster?public final T doAll(int numberOfOutputs, byte outputType, Frame fr)
numberOfOutputs
- Number of output vectors for the computation. All of them will have the same type.outputType
- The type of all the output Vec instances to create. See Vec.T_STR
, Vec.T_NUM
,
Vec.T_CAT
and other byte constants in Vec
to see possible values.fr
- Perform the computation on this Frame instance. This frame can be possibly mutated as side-effect.public void dfork(Key... keys)
public T doAllNodes()
public void asyncExecOnAllNodes()
public final T dfork(byte[] outputTypes, Vec... vecs)
outputs
Vec instances. This call is asynchronous. It returns 'this', on
which getResult
may be invoked by the caller to block for pending
computation to complete.public final T dfork(Frame fr)
getResult
may be invoked
by the caller to block for pending computation to complete. This call produces no
output Vec instances or Frame instances.fr
- Perform the computation on this Frame instance. This frame can be possibly mutated as side-effect.public final T dfork(byte[] outputTypes, Frame fr, boolean runLocal)
outputTypes
- The type of output Vec instances to create. See Vec.T_STR
, Vec.T_NUM
,
Vec.T_CAT
and other byte constants in Vec
to see possible values.fr
- Perform the computation on this Frame instance. This frame can be possibly mutated as side-effect.runLocal
- Run locally by copying data, or run across the cluster?public final T getResult(boolean fjManagedBlock)
public final T getResult()
public boolean isReleasable()
ForkJoinPool.ManagedBlocker
true
if blocking is unnecessary.isReleasable
in interface ForkJoinPool.ManagedBlocker
public boolean block() throws java.lang.InterruptedException
ForkJoinPool.ManagedBlocker
block
in interface ForkJoinPool.ManagedBlocker
true
if no additional blocking is necessary
(i.e., if isReleasable would return true)java.lang.InterruptedException
- if interrupted while waiting
(the method is not required to do so, but is allowed to)public final void dinvoke(H2ONode sender)
protected boolean modifiesVolatileVecs()
public final void compute2()
compute2
in class H2O.H2OCountedCompleter<T extends MRTask<T>>
public final void onCompletion(CountedCompleter caller)
onCompletion
in class CountedCompleter
caller
- the task invoking this method (which may
be this task itself).protected void postGlobal()
public final boolean onExceptionalCompletion(java.lang.Throwable ex, CountedCompleter caller)
onExceptionalCompletion
in class CountedCompleter
ex
- the exceptioncaller
- the task invoking this method (which may
be this task itself).