|
FlexDoc/Javadoc 2.0 Demo Java Doc |
Registration. Unlike the case for other barriers, the number of parties registered to synchronize on a phaser may vary over time. Tasks may be registered at any time (using methods register(), bulkRegister(int), or forms of constructors establishing initial numbers of parties), and optionally deregistered upon any arrival (using arriveAndDeregister()). As is the case with most basic synchronization constructs, registration and deregistration affect only internal counts; they do not establish any further internal bookkeeping, so tasks cannot query whether they are registered. (However, you can introduce such bookkeeping by subclassing this class.)
Synchronization. Like a CyclicBarrier, a Phaser may be repeatedly awaited. Method arriveAndAwaitAdvance() has effect analogous to CyclicBarrier.await. Each generation of a phaser has an associated phase number. The phase number starts at zero, and advances when all parties arrive at the phaser, wrapping around to zero after reaching Integer.MAX_VALUE. The use of phase numbers enables independent control of actions upon arrival at a phaser and upon awaiting others, via two kinds of methods that may be invoked by any registered party:
Termination. A phaser may enter a termination state, that may be checked using method isTerminated(). Upon termination, all synchronization methods immediately return without waiting for advance, as indicated by a negative return value. Similarly, attempts to register upon termination have no effect. Termination is triggered when an invocation of onAdvance returns true. The default implementation returns true if a deregistration has caused the number of registered parties to become zero. As illustrated below, when phasers control actions with a fixed number of iterations, it is often convenient to override this method to cause termination when the current phase number reaches a threshold. Method forceTermination() is also available to abruptly release waiting threads and allow them to terminate.
Tiering. Phasers may be tiered (i.e., constructed in tree structures) to reduce contention. Phasers with large numbers of parties that would otherwise experience heavy synchronization contention costs may instead be set up so that groups of sub-phasers share a common parent. This may greatly increase throughput even though it incurs greater per-operation overhead.
In a tree of tiered phasers, registration and deregistration of child phasers with their parent are managed automatically. Whenever the number of registered parties of a child phaser becomes non-zero (as established in the Phaser(Phaser,int) constructor, register(), or bulkRegister(int)), the child phaser is registered with its parent. Whenever the number of registered parties becomes zero as the result of an invocation of arriveAndDeregister(), the child phaser is deregistered from its parent.
Monitoring. While synchronization methods may be invoked only by registered parties, the current state of a phaser may be monitored by any caller. At any given moment there are getRegisteredParties() parties in total, of which getArrivedParties() have arrived at the current phase (getPhase()). When the remaining (getUnarrivedParties()) parties arrive, the phase advances. The values returned by these methods may reflect transient states and so are not in general useful for synchronization control. Method toString() returns snapshots of these state queries in a form convenient for informal monitoring.
Memory consistency effects: Actions prior to any form of arrive method happen-before a corresponding phase advance and onAdvance actions (if present), which in turn happen-before actions following the phase advance.
Sample usages:
A Phaser may be used instead of a CountDownLatch to control a one-shot action serving a variable number of parties. The typical idiom is for the method setting this up to first register, then start all the actions, then deregister, as in:
void runTasks(List<Runnable> tasks) {
Phaser startingGate = new Phaser(1); // "1" to register self
// create and start threads
for (Runnable task : tasks) {
startingGate.register();
new Thread(() -> {
startingGate.arriveAndAwaitAdvance();
task.run();
}).start();
}
// deregister self to allow threads to proceed
startingGate.arriveAndDeregister();
}
One way to cause a set of threads to repeatedly perform actions for a given number of iterations is to override onAdvance:
void startTasks(List<Runnable> tasks, int iterations) {
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations - 1 || registeredParties == 0;
}
};
phaser.register();
for (Runnable task : tasks) {
phaser.register();
new Thread(() -> {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}).start();
}
// allow threads to proceed; don't wait for them
phaser.arriveAndDeregister();
}
If the main task must later await termination, it
may re-register and then execute a similar loop:
// ...
phaser.register();
while (!phaser.isTerminated())
phaser.arriveAndAwaitAdvance();
Related constructions may be used to await particular phase numbers in contexts where you are sure that the phase will never wrap around Integer.MAX_VALUE. For example:
void awaitPhase(Phaser phaser, int phase) {
int p = phaser.register(); // assumes caller not already registered
while (p < phase) {
if (phaser.isTerminated())
// ... deal with unexpected termination
else
p = phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
}
To create a set of n tasks using a tree of phasers, you could use code of the following form, assuming a Task class with a constructor accepting a Phaser that it registers with upon construction. After invocation of build(new Task[n], 0, n, new Phaser()), these tasks could then be started, for example by submitting to a pool:
void build(Task[] tasks, int lo, int hi, Phaser ph) {
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
build(tasks, i, j, new Phaser(ph));
}
} else {
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(ph);
// assumes new Task(ph) performs ph.register()
}
}
The best value of TASKS_PER_PHASER depends mainly on
expected synchronization rates. A value as low as four may
be appropriate for extremely small per-phase task bodies (thus
high rates), or up to hundreds for extremely large ones.
Implementation notes: This implementation restricts the maximum number of parties to 65535. Attempts to register additional parties result in IllegalStateException. However, you can and should create tiered phasers to accommodate arbitrarily large sets of participants.
Constructor Summary |
||
Phaser()
Creates a new phaser with no initially registered parties, no
parent, and initial phase number 0.
|
||
Phaser(int parties)
Creates a new phaser with the given number of registered
unarrived parties, no parent, and initial phase number 0.
|
||
Equivalent to Phaser(parent, 0).
|
||
Creates a new phaser with the given parent and number of
registered unarrived parties.
|
Method Summary |
||
int |
arrive()
Arrives at this phaser, without waiting for others to arrive.
|
|
int |
Arrives at this phaser and awaits others.
|
|
int |
Arrives at this phaser and deregisters from it without waiting
for others to arrive.
|
|
int |
awaitAdvance(int phase)
Awaits the phase of this phaser to advance from the given phase
value, returning immediately if the current phase is not equal
to the given phase value or this phaser is terminated.
|
|
int |
awaitAdvanceInterruptibly(int phase)
Awaits the phase of this phaser to advance from the given phase
value, throwing InterruptedException if interrupted
while waiting, or returning immediately if the current phase is
not equal to the given phase value or this phaser is
terminated.
|
|
int |
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
Awaits the phase of this phaser to advance from the given phase
value or the given timeout to elapse, throwing
InterruptedException if interrupted while waiting, or
returning immediately if the current phase is not equal to the
given phase value or this phaser is terminated.
|
|
int |
bulkRegister(int parties)
Adds the given number of new unarrived parties to this phaser.
|
|
void |
Forces this phaser to enter termination state.
|
|
int |
Returns the number of registered parties that have arrived at
the current phase of this phaser.
|
|
Returns the parent of this phaser, or null if none.
|
||
final int |
getPhase()
Returns the current phase number.
|
|
int |
Returns the number of parties registered at this phaser.
|
|
getRoot()
Returns the root ancestor of this phaser, which is the same as
this phaser if it has no parent.
|
||
int |
Returns the number of registered parties that have not yet
arrived at the current phase of this phaser.
|
|
boolean |
Returns true if this phaser has been terminated.
|
|
protected boolean |
onAdvance(int phase, int registeredParties)
Overridable method to perform an action upon impending phase
advance, and to control termination.
|
|
int |
register()
Adds a new unarrived party to this phaser.
|
|
toString()
Returns a string identifying this phaser, as well as its
state.
|
Methods inherited from class java.lang.Object |
public Phaser |
() |
public Phaser |
(int parties) |
public Phaser |
(Phaser parent) |
public Phaser |
(Phaser parent, int parties) |
public int register |
() |
public int bulkRegister |
(int parties) |
public int arrive |
() |
It is a usage error for an unregistered party to invoke this method. However, this error may result in an IllegalStateException only upon some subsequent operation on this phaser, if ever.
public int arriveAndDeregister |
() |
It is a usage error for an unregistered party to invoke this method. However, this error may result in an IllegalStateException only upon some subsequent operation on this phaser, if ever.
public int arriveAndAwaitAdvance |
() |
It is a usage error for an unregistered party to invoke this method. However, this error may result in an IllegalStateException only upon some subsequent operation on this phaser, if ever.
public int awaitAdvance |
(int phase) |
public int awaitAdvanceInterruptibly |
(int phase) |
throws |
public int awaitAdvanceInterruptibly |
|
throws |
public void forceTermination |
() |
public final int getPhase |
() |
public int getRegisteredParties |
() |
public int getArrivedParties |
() |
public int getUnarrivedParties |
() |
public Phaser getParent |
() |
public Phaser getRoot |
() |
public boolean isTerminated |
() |
protected boolean onAdvance |
(int phase, int registeredParties) |
The arguments to this method provide the state of the phaser prevailing for the current transition. The effects of invoking arrival, registration, and waiting methods on this phaser from within onAdvance are unspecified and should not be relied on.
If this phaser is a member of a tiered set of phasers, then onAdvance is invoked only for its root phaser on each advance.
To support the most common use cases, the default implementation of this method returns true when the number of registered parties has become zero as the result of a party invoking arriveAndDeregister. You can disable this behavior, thus enabling continuation upon future registrations, by overriding this method to always return false:
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int parties) { return false; }
};
public String toString |
() |
|
FlexDoc/Javadoc 2.0 Demo Java Doc |