public abstract class MRTask<T extends MRTask<T>> extends DTask<T> implements ForkJoinPool.ManagedBlocker
MRTask provides several map
and reduce
methods
that can be overridden to specify a computation. Several instances of this
class will be created to distribute the computation over F/J threads and
machines. Non-transient fields are copied and serialized to instances
created for map invocations. Reduce methods can store their results in
fields. Results are serialized and reduced all the way back to the invoking
node. When the last reduce method has been called, fields of the initial
MRTask instance contains the computation results.
Apart from small reduced POJO returned to the calling node, MRTask can
produce output vector(s) as a result. These will have chunks co-located
with the input dataset, however, their number of lines will generally differ
so they won't be strictly compatible with the original. To produce output
vectors, call doAll.dfork version with required number of outputs and
override appropriate map
call taking required number of
NewChunks. MRTask will automatically close the new Appendable vecs and a
call to outputFrame
will make a frame with newly created Vecs.
Overview
Distributed computation starts by calling doAll
,
dfork
, or dfork
. doAll
simply
calls dfork
and dfork
before blocking;
dfork
and dfork
are non-blocking. The main
pardigm is divide-conquer-combine using ForkJoin.
If doAll
is called with Keys, then one map
call is
made per Key, on the Key's home node. If MRTask is invoked on a Frame (or
equivalently a Vec[]), then one map
call is made per Chunk for
all Vecs at once, on the Chunk's home node. In both modes,
reduce
is called between each pair of calls to
map
.
MRTask can also be called with doAllNodes
, in which case only
the setupLocal call is made once per node; neither map nor reduce are
called.
Computation is tailored primarily by overriding. The main method is the
map
call, coupled sometimes with a reduce
call.
setupLocal
is called once per node before any map calls are
made on that node (but perhaps other nodes have already started); in reverse
closeLocal
is called after the last map call completes on a
node (but perhaps other nodes are still computing maps).
postGlobal
is called once only after all maps, reduces and
closeLocals, and only on the home node.
DTask.DKeyTask<T extends DTask.DKeyTask,V extends Keyed>, DTask.RemoveCall
Modifier and Type | Field and Description |
---|---|
protected AppendableVec[] |
_appendables
Appendables are treated separately (roll-ups computed in map/reduce
style, can not be passed via K/V store).
|
Frame |
_fr
This Frame instance is the handle for computation over a set of Vec instances.
|
protected Futures |
_fs
We can add more things to block on - in case we want a bunch of lazy
tasks produced by children to all end before this top-level task ends.
|
protected int |
_hi
Internal field to track a range of local Chunks to work on
|
Key[] |
_keys
This
Key[] instance is the handle used for computation when
an MRTask is invoked over an array of Key instances. |
protected T |
_left
Internal field to track the left & right sub-range of chunks to work on
|
protected int |
_lo
Internal field to track a range of local Chunks to work on
|
protected short |
_nhi
The range of Nodes to work on remotely
|
protected RPC<T> |
_nleft
Internal field to track the left & right remote nodes/JVMs to work on
|
protected short |
_nlo
The range of Nodes to work on remotely
|
protected RPC<T> |
_nrite
Internal field to track the left & right remote nodes/JVMs to work on
|
protected T |
_rite
Internal field to track the left & right sub-range of chunks to work on
|
protected boolean |
_run_local
If true, run entirely local - which will pull all the data locally.
|
protected boolean |
_topLocal
Internal field to track if this is a top-level local call
|
Modifier | Constructor and Description |
---|---|
|
MRTask() |
protected |
MRTask(byte prior) |
protected |
MRTask(H2O.H2OCountedCompleter cmp) |
Modifier and Type | Method and Description |
---|---|
AppendableVec[] |
appendables()
New Output vectors; may be null.
|
void |
asyncExecOnAllNodes() |
boolean |
block()
Possibly blocks the current thread, for example waiting for
a lock or condition.
|
protected void |
closeLocal()
Override to do any remote cleaning on the last remote instance of
this object, for disposing of node-local shared data structures.
|
void |
compute2()
Called from FJ threads to do local work.
|
T |
dfork(byte[] types,
Frame fr,
boolean run_local)
Fork the task in strictly non-blocking fashion.
|
T |
dfork(byte[] types,
Vec... vecs)
Invokes the map/reduce computation over the given Vec instances and produces
outputs Vec instances. |
T |
dfork(Frame fr)
Invokes the map/reduce computation over the given Frame instance.
|
void |
dfork(Key... keys) |
T |
dfork(Vec... vecs) |
void |
dinvoke(H2ONode sender)
Called once on remote at top level, probably with a subset of the cloud.
|
T |
doAll(byte[] types,
Frame fr) |
T |
doAll(byte[] types,
Frame fr,
boolean run_local) |
T |
doAll(byte[] types,
Vec... vecs) |
T |
doAll(byte[] types,
Vec vec,
boolean run_local) |
T |
doAll(byte type,
Frame fr) |
T |
doAll(byte type,
Vec... vecs) |
T |
doAll(Frame fr) |
T |
doAll(Frame fr,
boolean run_local)
Invokes the map/reduce computation over the given Frame.
|
T |
doAll(int nouts,
byte type,
Frame fr) |
T |
doAll(Key... keys) |
T |
doAll(Vec... vecs)
Invokes the map/reduce computation over the given Vecs.
|
T |
doAll(Vec vec,
boolean run_local) |
T |
doAllNodes() |
T |
getResult()
Block for and get any final results from a dfork'd MRTask.
|
T |
getResult(boolean fjManagedBlock)
Block for and get any final results from a dfork'd MRTask.
|
boolean |
isReleasable()
Returns
true if blocking is unnecessary. |
void |
map(Chunk c)
Override with your map implementation.
|
void |
map(Chunk[] cs)
Override with your map implementation.
|
void |
map(Chunk[] cs,
NewChunk nc)
The handy method to generate a new vector based on existing vectors.
|
void |
map(Chunk[] cs,
NewChunk[] ncs) |
void |
map(Chunk[] cs,
NewChunk nc1,
NewChunk nc2) |
void |
map(Chunk c0,
Chunk c1)
Override with your map implementation.
|
void |
map(Chunk c0,
Chunk c1,
Chunk c2)
Override with your map implementation.
|
void |
map(Chunk c0,
Chunk c1,
NewChunk nc) |
void |
map(Chunk c,
NewChunk nc) |
void |
map(Chunk c0,
NewChunk nc0,
NewChunk nc1) |
void |
map(Key key)
Override with your map implementation.
|
protected boolean |
modifiesVolatileVecs() |
void |
onCompletion(CountedCompleter caller)
OnCompletion - reduce the left and right into self.
|
boolean |
onExceptionalCompletion(java.lang.Throwable ex,
CountedCompleter caller)
Cancel/kill all work as we can, then rethrow...
|
Frame |
outputFrame()
Get the resulting Frame from this invoked MRTask.
|
Frame |
outputFrame(Key<Frame> key,
java.lang.String[] names,
java.lang.String[][] domains)
Get the resulting Frame from this invoked MRTask.
|
Frame |
outputFrame(java.lang.String[] names,
java.lang.String[][] domains)
Get the resulting Frame from this invoked MRTask.
|
protected void |
postGlobal() |
T |
profile()
Used to invoke profiling.
|
java.lang.String |
profString() |
void |
reduce(T mrt)
Override to combine results from 'mrt' into 'this' MRTask.
|
protected T |
self() |
protected void |
setupLocal()
Override to do any remote initialization on the 1st remote instance of
this object, for initializing node-local shared data structures.
|
copyOver, getDException, hasException, logVerbose, onAck, onAckAck, setException
asBytes, clone, compute, compute1, currThrPriority, frozenType, icer, priority, read, readJSON, reloadFromBytes, write, writeJSON
__tryComplete, addToPendingCount, compareAndSetPendingCount, complete, exec, getCompleter, getPendingCount, getRawResult, setCompleter, setPendingCount, setRawResult, tryComplete
adapt, adapt, adapt, cancel, compareAndSetForkJoinTaskTag, completeExceptionally, fork, get, get, getException, getForkJoinTaskTag, getPool, getQueuedTaskCount, getSurplusQueuedTaskCount, helpQuiesce, inForkJoinPool, invoke, invokeAll, invokeAll, invokeAll, isCancelled, isCompletedAbnormally, isCompletedNormally, isDone, join, peekNextLocalTask, pollNextLocalTask, pollTask, quietlyComplete, quietlyInvoke, quietlyJoin, reinitialize, setForkJoinTaskTag, tryUnfork
public Frame _fr
doAll
with Frame and Vec[] instances. Top-level calls to
doAll
wrap Vec instances into a new Frame instance and set this into
_fr
during a call to dfork
.public Key[] _keys
Key[]
instance is the handle used for computation when
an MRTask is invoked over an array of Key
instances.protected AppendableVec[] _appendables
protected transient RPC<T extends MRTask<T>> _nleft
protected transient RPC<T extends MRTask<T>> _nrite
protected transient boolean _topLocal
protected transient T extends MRTask<T> _left
protected transient T extends MRTask<T> _rite
protected short _nlo
protected short _nhi
protected transient int _lo
protected transient int _hi
protected transient Futures _fs
protected boolean _run_local
public MRTask()
protected MRTask(H2O.H2OCountedCompleter cmp)
protected MRTask(byte prior)
public AppendableVec[] appendables()
public java.lang.String profString()
public T profile()
new MRTask().profile().doAll();
-
outputFrame
public Frame outputFrame()
Get the resulting Frame from this invoked MRTask. This Frame is not
in the DKV. AppendableVec instances are closed into Vec instances,
which then appear in the DKV.
- Returns:
- null if no outputs, otherwise returns the resulting Frame from
the MRTask. The Frame has no column names nor domains.
-
outputFrame
public Frame outputFrame(java.lang.String[] names,
java.lang.String[][] domains)
Get the resulting Frame from this invoked MRTask. This Frame is not in
the DKV. AppendableVec instances are closed into Vec instances, which
then appear in the DKV.
- Parameters:
names
- The names of the columns in the resulting Frame.domains
- The domains of the columns in the resulting Frame.
- Returns:
- The result Frame, or null if no outputs
-
outputFrame
public Frame outputFrame(Key<Frame> key,
java.lang.String[] names,
java.lang.String[][] domains)
Get the resulting Frame from this invoked MRTask. If the passed in key
is not null, then the resulting Frame will appear in the DKV. AppendableVec instances
are closed into Vec instances, which then appear in the DKV.
- Parameters:
key
- If null, then the Frame will not appear in the DKV. Otherwise, this result
will appear in the DKV under this key.names
- The names of the columns in the resulting Frame.domains
- The domains of the columns in the resulting Frame.
- Returns:
- null if _noutputs is 0, otherwise returns a Frame.
-
map
public void map(Chunk c)
Override with your map implementation. This overload is given a single
local input Chunk. It is meant for map/reduce jobs that use a
single column in a input Frame. All map variants are called, but only one is
expected to be overridden.
-
map
public void map(Chunk c0,
Chunk c1)
Override with your map implementation. This overload is given two
local Chunks. All map variants are called, but only one
is expected to be overridden.
-
map
public void map(Chunk c0,
Chunk c1,
Chunk c2)
Override with your map implementation. This overload is given three
local input Chunks. All map variants are called, but only one
is expected to be overridden.
-
map
public void map(Chunk[] cs)
Override with your map implementation. This overload is given an array
of local input Chunks, for Frames with arbitrary column
numbers. All map variants are called, but only one is expected to be
overridden.
-
map
public void map(Chunk[] cs,
NewChunk nc)
The handy method to generate a new vector based on existing vectors.
Note: This method is used by Sparkling Water examples.
- Parameters:
cs
- input vectorsnc
- output vector
-
map
public void map(Key key)
Override with your map implementation. Used when doAll is called with
an array of Keys, and called once-per-Key on the Key's Home node
-
reduce
public void reduce(T mrt)
Override to combine results from 'mrt' into 'this' MRTask. Both 'this'
and 'mrt' are guaranteed to either have map() run on them, or be the
results of a prior reduce(). Reduce is optional if, e.g., the result is
some output vector.
-
setupLocal
protected void setupLocal()
Override to do any remote initialization on the 1st remote instance of
this object, for initializing node-local shared data structures.
-
closeLocal
protected void closeLocal()
Override to do any remote cleaning on the last remote instance of
this object, for disposing of node-local shared data structures.
-
self
protected T self()
-
doAll
public final T doAll(Vec... vecs)
Invokes the map/reduce computation over the given Vecs. This call is
blocking.
-
doAll
public final T doAll(Frame fr,
boolean run_local)
Invokes the map/reduce computation over the given Frame. This call is
blocking.
-
dfork
public void dfork(Key... keys)
-
doAllNodes
public T doAllNodes()
-
asyncExecOnAllNodes
public void asyncExecOnAllNodes()
-
dfork
public final T dfork(byte[] types,
Vec... vecs)
Invokes the map/reduce computation over the given Vec instances and produces
outputs
Vec instances. This call is asynchronous. It returns 'this', on
which getResult
may be invoked by the caller to block for pending
computation to complete.
- Parameters:
types
- The type of output Vec instances to create.vecs
- The input set of Vec instances upon which computation is performed.
- Returns:
- this
-
dfork
public final T dfork(Frame fr)
Invokes the map/reduce computation over the given Frame instance. This call is
asynchronous. It returns 'this', on which getResult
may be invoked
by the caller to block for pending computation to complete. This call produces no
output Vec instances or Frame instances.
- Parameters:
fr
- Perform the computation on this Frame instance.
- Returns:
- this
-
dfork
public final T dfork(byte[] types,
Frame fr,
boolean run_local)
Fork the task in strictly non-blocking fashion.
Same functionality as dfork, but does not raise priority, so user is should
*never* block on it.
Because it does not raise priority, these can be tail-call chained together
for any length.
-
getResult
public final T getResult(boolean fjManagedBlock)
Block for and get any final results from a dfork'd MRTask.
Note: the desired name 'get' is final in ForkJoinTask.
-
getResult
public final T getResult()
Block for and get any final results from a dfork'd MRTask.
Note: the desired name 'get' is final in ForkJoinTask.
-
isReleasable
public boolean isReleasable()
Description copied from interface: ForkJoinPool.ManagedBlocker
Returns true
if blocking is unnecessary.
- Specified by:
isReleasable
in interface ForkJoinPool.ManagedBlocker
-
block
public boolean block()
throws java.lang.InterruptedException
Description copied from interface: ForkJoinPool.ManagedBlocker
Possibly blocks the current thread, for example waiting for
a lock or condition.
- Specified by:
block
in interface ForkJoinPool.ManagedBlocker
- Returns:
true
if no additional blocking is necessary
(i.e., if isReleasable would return true)
- Throws:
java.lang.InterruptedException
- if interrupted while waiting
(the method is not required to do so, but is allowed to)
-
dinvoke
public final void dinvoke(H2ONode sender)
Called once on remote at top level, probably with a subset of the cloud.
Called internal by D/F/J. Not expected to be user-called.
-
modifiesVolatileVecs
protected boolean modifiesVolatileVecs()
-
compute2
public final void compute2()
Called from FJ threads to do local work. The first called Task (which is
also the last one to Complete) also reduces any global work. Called
internal by F/J. Not expected to be user-called.
- Overrides:
compute2
in class H2O.H2OCountedCompleter<T extends MRTask<T>>
-
onCompletion
public final void onCompletion(CountedCompleter caller)
OnCompletion - reduce the left and right into self. Called internal by
F/J. Not expected to be user-called.
- Overrides:
onCompletion
in class CountedCompleter
- Parameters:
caller
- the task invoking this method (which may
be this task itself).
-
postGlobal
protected void postGlobal()
-
onExceptionalCompletion
public final boolean onExceptionalCompletion(java.lang.Throwable ex,
CountedCompleter caller)
Cancel/kill all work as we can, then rethrow... do not invisibly swallow
exceptions (which is the F/J default). Called internal by F/J. Not
expected to be user-called.
- Overrides:
onExceptionalCompletion
in class CountedCompleter
- Parameters:
ex
- the exceptioncaller
- the task invoking this method (which may
be this task itself).
- Returns:
- true if this exception should be propagated to this
tasks completer, if one exists.