public abstract class CountedCompleter extends ForkJoinTask<java.lang.Void>
ForkJoinTask
with a completion action
performed when triggered and there are no remaining pending
actions. Uses of CountedCompleter are similar to those of other
completion based components (such as CompletionHandler
) except that multiple
pending completions may be necessary to trigger the onCompletion(jsr166y.CountedCompleter)
action, not just one. Unless initialized otherwise,
the pending count
starts at zero, but may
be (atomically) changed using methods setPendingCount(int)
,
addToPendingCount(int)
, and compareAndSetPendingCount(int, int)
. Upon invocation of tryComplete()
, if the pending action count is nonzero, it is
decremented; otherwise, the completion action is performed, and if
this completer itself has a completer, the process is continued
with its completer. As is the case with related synchronization
components such as Phaser
and Semaphore
these methods affect only internal
counts; they do not establish any further internal bookkeeping. In
particular, the identities of pending tasks are not maintained. As
illustrated below, you can create subclasses that do record some or
all pended tasks or their results when needed.
A concrete CountedCompleter class must define method compute()
, that should, in almost all use cases, invoke tryComplete()
once before returning. The class may also optionally
override method onCompletion(jsr166y.CountedCompleter)
to perform an action upon
normal completion, and method onExceptionalCompletion(java.lang.Throwable, jsr166y.CountedCompleter)
to
perform an action upon any exception.
A CountedCompleter that does not itself have a completer (i.e.,
one for which getCompleter()
returns null
) can be
used as a regular ForkJoinTask with this added functionality.
However, any completer that in turn has another completer serves
only as an internal helper for other computations, so its own task
status (as reported in methods such as ForkJoinTask.isDone()
)
is arbitrary; this status changes only upon explicit invocations of
complete(java.lang.Void)
, ForkJoinTask.cancel(boolean)
, ForkJoinTask.completeExceptionally(java.lang.Throwable)
or upon exceptional completion
of method compute
. Upon any exceptional completion, the
exception may be relayed to a task's completer (and its completer,
and so on), if one exists and it has not otherwise already
completed.
Sample Usages.
Parallel recursive decomposition. CountedCompleters may
be arranged in trees similar to those often used with RecursiveAction
s, although the constructions involved in setting
them up typically vary. Even though they entail a bit more
bookkeeping, CountedCompleters may be better choices when applying
a possibly time-consuming operation (that cannot be further
subdivided) to each element of an array or collection; especially
when the operation takes a significantly different amount of time
to complete for some elements than others, either because of
intrinsic variation (for example IO) or auxiliary effects such as
garbage collection. Because CountedCompleters provide their own
continuations, other threads need not block waiting to perform
them.
For example, here is an initial version of a class that uses
divide-by-two recursive decomposition to divide work into single
pieces (leaf tasks). Even when work is split into individual calls,
tree-based techniques are usually preferable to directly forking
leaf tasks, because they reduce inter-thread communication and
improve load balancing. In the recursive case, the second of each
pair of subtasks to finish triggers completion of its parent
(because no result combination is performed, the default no-op
implementation of method onCompletion
is not overridden). A
static utility method sets up the base task and invokes it:
class MyOperation<E> { void apply(E e) { ... } }
class ForEach<E> extends CountedCompleter {
public static <E> void forEach(ForkJoinPool pool, E[] array, MyOperation<E> op) {
pool.invoke(new ForEach<E>(null, array, op, 0, array.length));
}
final E[] array; final MyOperation<E> op; final int lo, hi;
ForEach(CountedCompleter p, E[] array, MyOperation<E> op, int lo, int hi) {
super(p);
this.array = array; this.op = op; this.lo = lo; this.hi = hi;
}
public void compute() { // version 1
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
setPendingCount(2); // must set pending count before fork
new ForEach(this, array, op, mid, hi).fork(); // right child
new ForEach(this, array, op, lo, mid).fork(); // left child
}
else if (hi > lo)
op.apply(array[lo]);
tryComplete();
}
}
This design can be improved by noticing that in the recursive case,
the task has nothing to do after forking its right task, so can
directly invoke its left task before returning. (This is an analog
of tail recursion removal.) Also, because the task returns upon
executing its left task (rather than falling through to invoke
tryComplete) the pending count is set to one:
class ForEach<E> ...
public void compute() { // version 2
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
setPendingCount(1); // only one pending
new ForEach(this, array, op, mid, hi).fork(); // right child
new ForEach(this, array, op, lo, mid).compute(); // direct invoke
}
else {
if (hi > lo)
op.apply(array[lo]);
tryComplete();
}
}
As a further improvement, notice that the left task need not even
exist. Instead of creating a new one, we can iterate using the
original task, and add a pending count for each fork:
class ForEach<E> ...
public void compute() { // version 3
int l = lo, h = hi;
while (h - l >= 2) {
int mid = (l + h) >>> 1;
addToPendingCount(1);
new ForEach(this, array, op, mid, h).fork(); // right child
h = mid;
}
if (h > l)
op.apply(array[l]);
tryComplete();
}
Additional improvements of such classes might entail precomputing
pending counts so that they can be established in constructors,
specializing classes for leaf steps, subdividing by say, four,
instead of two per iteration, and using an adaptive threshold
instead of always subdividing down to single elements.
Recording subtasks. CountedCompleter tasks that combine
results of multiple subtasks usually need to access these results
in method onCompletion(jsr166y.CountedCompleter)
. As illustrated in the following
class (that performs a simplified form of map-reduce where mappings
and reductions are all of type E
), one way to do this in
divide and conquer designs is to have each subtask record its
sibling, so that it can be accessed in method onCompletion
.
For clarity, this class uses explicit left and right subtasks, but
variants of other streamlinings seen in the above example may also
apply.
class MyMapper<E> { E apply(E v) { ... } }
class MyReducer<E> { E apply(E x, E y) { ... } }
class MapReducer<E> extends CountedCompleter {
final E[] array; final MyMapper<E> mapper;
final MyReducer<E> reducer; final int lo, hi;
MapReducer sibling;
E result;
MapReducer(CountedCompleter p, E[] array, MyMapper<E> mapper,
MyReducer<E> reducer, int lo, int hi) {
super(p);
this.array = array; this.mapper = mapper;
this.reducer = reducer; this.lo = lo; this.hi = hi;
}
public void compute() {
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
left.sibling = right;
right.sibling = left;
setPendingCount(1); // only right is pending
right.fork();
left.compute(); // directly execute left
}
else {
if (hi > lo)
result = mapper.apply(array[lo]);
tryComplete();
}
}
public void onCompletion(CountedCompleter caller) {
if (caller != this) {
MapReducer<E> child = (MapReducer<E>)caller;
MapReducer<E> sib = child.sibling;
if (sib == null || sib.result == null)
result = child.result;
else
result = reducer.apply(child.result, sib.result);
}
}
public static <E> E mapReduce(ForkJoinPool pool, E[] array,
MyMapper<E> mapper, MyReducer<E> reducer) {
MapReducer<E> mr = new MapReducer<E>(null, array, mapper,
reducer, 0, array.length);
pool.invoke(mr);
return mr.result;
}
}
Triggers. Some CountedCompleters are themselves never forked, but instead serve as bits of plumbing in other designs; including those in which the completion of one of more async tasks triggers another async task. For example:
class HeaderBuilder extends CountedCompleter { ... }
class BodyBuilder extends CountedCompleter { ... }
class PacketSender extends CountedCompleter {
PacketSender(...) { super(null, 1); ... } // trigger on second completion
public void compute() { } // never called
public void onCompletion(CountedCompleter caller) { sendPacket(); }
}
// sample use:
PacketSender p = new PacketSender();
new HeaderBuilder(p, ...).fork();
new BodyBuilder(p, ...).fork();
Modifier | Constructor and Description |
---|---|
protected |
CountedCompleter()
Creates a new CountedCompleter with no completer
and an initial pending count of zero.
|
protected |
CountedCompleter(CountedCompleter completer)
Creates a new CountedCompleter with the given completer
and an initial pending count of zero.
|
protected |
CountedCompleter(CountedCompleter completer,
int initialPendingCount)
Creates a new CountedCompleter with the given completer
and initial pending count.
|
Modifier and Type | Method and Description |
---|---|
void |
__tryComplete(CountedCompleter caller)
H2O Hack to get distributed FJ behavior closer to the behavior on local node.
|
void |
addToPendingCount(int delta)
Adds (atomically) the given value to the pending count.
|
boolean |
compareAndSetPendingCount(int expected,
int count)
Sets (atomically) the pending count to the given count only if
it currently holds the given expected value.
|
void |
complete(java.lang.Void mustBeNull)
Regardless of pending count, invokes
onCompletion(jsr166y.CountedCompleter) ,
marks this task as complete with a null return value,
and further triggers tryComplete() on this task's
completer, if one exists. |
abstract void |
compute()
The main computation performed by this task.
|
protected boolean |
exec()
Implements execution conventions for CountedCompleters
|
CountedCompleter |
getCompleter()
Returns the completer established in this task's constructor,
or
null if none. |
int |
getPendingCount()
Returns the current pending count.
|
java.lang.Void |
getRawResult()
Always returns
null . |
void |
onCompletion(CountedCompleter caller)
Performs an action when method
tryComplete() is invoked
and there are no pending counts, or when the unconditional
method complete(java.lang.Void) is invoked. |
boolean |
onExceptionalCompletion(java.lang.Throwable ex,
CountedCompleter caller)
Performs an action when method
ForkJoinTask.completeExceptionally(java.lang.Throwable)
is invoked or method compute() throws an exception, and
this task has not otherwise already completed normally. |
void |
setCompleter(CountedCompleter x) |
void |
setPendingCount(int count)
Sets the pending count to the given value.
|
protected void |
setRawResult(java.lang.Void mustBeNull)
Requires null completion value.
|
void |
tryComplete()
If the pending count is nonzero, decrements the count;
otherwise invokes
onCompletion(jsr166y.CountedCompleter) and then similarly
tries to complete this task's completer, if one exists,
else marks this task as complete. |
adapt, adapt, adapt, cancel, compareAndSetForkJoinTaskTag, completeExceptionally, fork, get, get, get, getException, getForkJoinTaskTag, getPool, getQueuedTaskCount, getSurplusQueuedTaskCount, helpQuiesce, inForkJoinPool, invoke, invokeAll, invokeAll, invokeAll, isCancelled, isCompletedAbnormally, isCompletedNormally, isDone, join, peekNextLocalTask, pollNextLocalTask, pollTask, quietlyComplete, quietlyInvoke, quietlyJoin, reinitialize, setForkJoinTaskTag, tryUnfork
protected CountedCompleter(CountedCompleter completer, int initialPendingCount)
completer
- this tasks completer, or null
if noneinitialPendingCount
- the initial pending countprotected CountedCompleter(CountedCompleter completer)
completer
- this tasks completer, or null
if noneprotected CountedCompleter()
public abstract void compute()
public void onCompletion(CountedCompleter caller)
tryComplete()
is invoked
and there are no pending counts, or when the unconditional
method complete(java.lang.Void)
is invoked. By default, this method
does nothing.caller
- the task invoking this method (which may
be this task itself).public boolean onExceptionalCompletion(java.lang.Throwable ex, CountedCompleter caller)
ForkJoinTask.completeExceptionally(java.lang.Throwable)
is invoked or method compute()
throws an exception, and
this task has not otherwise already completed normally. On
entry to this method, this task ForkJoinTask.isCompletedAbnormally()
. The return value of this
method controls further propagation: If true
and this
task has a completer, then this completer is also completed
exceptionally. The default implementation of this method does
nothing except return true
.ex
- the exceptioncaller
- the task invoking this method (which may
be this task itself).public final CountedCompleter getCompleter()
null
if none.public final void setCompleter(CountedCompleter x)
public final int getPendingCount()
public final void setPendingCount(int count)
count
- the countpublic final void addToPendingCount(int delta)
delta
- the value to addpublic final boolean compareAndSetPendingCount(int expected, int count)
expected
- the expected valuecount
- the new valuepublic final void tryComplete()
onCompletion(jsr166y.CountedCompleter)
and then similarly
tries to complete this task's completer, if one exists,
else marks this task as complete.public final void __tryComplete(CountedCompleter caller)
caller
- - The child task completing thispublic void complete(java.lang.Void mustBeNull)
onCompletion(jsr166y.CountedCompleter)
,
marks this task as complete with a null
return value,
and further triggers tryComplete()
on this task's
completer, if one exists. This method may be useful when
forcing completion as soon as any one (versus all) of several
subtask results are obtained.complete
in class ForkJoinTask<java.lang.Void>
mustBeNull
- the null
completion valueprotected final boolean exec()
exec
in class ForkJoinTask<java.lang.Void>
true
if this task is known to have completed normallypublic final java.lang.Void getRawResult()
null
.getRawResult
in class ForkJoinTask<java.lang.Void>
null
alwaysprotected final void setRawResult(java.lang.Void mustBeNull)
setRawResult
in class ForkJoinTask<java.lang.Void>
mustBeNull
- the value