|
FlexDoc/Javadoc 2.0 Demo Java Doc |
A SubmissionPublisher uses the Executor supplied in its constructor for delivery to subscribers. The best choice of Executor depends on expected usage. If the generator(s) of submitted items run in separate threads, and the number of subscribers can be estimated, consider using a Executors.newFixedThreadPool(int). Otherwise consider using the default, normally the ForkJoinPool.commonPool().
Buffering allows producers and consumers to transiently operate at different rates. Each subscriber uses an independent buffer. Buffers are created upon first use and expanded as needed up to the given maximum. (The enforced capacity may be rounded up to the nearest power of two and/or bounded by the largest value supported by this implementation.) Invocations of request do not directly result in buffer expansion, but risk saturation if unfilled requests exceed the maximum capacity. The default value of Flow.defaultBufferSize() may provide a useful starting point for choosing a capacity based on expected rates, resources, and usages.
A single SubmissionPublisher may be shared among multiple sources. Actions in a source thread prior to publishing an item or issuing a signal happen-before actions subsequent to the corresponding access by each subscriber. But reported estimates of lag and demand are designed for use in monitoring, not for synchronization control, and may reflect stale or inaccurate views of progress.
Publication methods support different policies about what to do when buffers are saturated. Method submit blocks until resources are available. This is simplest, but least responsive. The offer methods may drop items (either immediately or with bounded timeout), but provide an opportunity to interpose a handler and then retry.
If any Subscriber method throws an exception, its subscription is cancelled. If a handler is supplied as a constructor argument, it is invoked before cancellation upon an exception in method onNext, but exceptions in methods onSubscribe, onError and onComplete are not recorded or handled before cancellation. If the supplied Executor throws RejectedExecutionException (or any other RuntimeException or Error) when attempting to execute a task, or a drop handler throws an exception when processing a dropped item, then the exception is rethrown. In these cases, not all subscribers will have been issued the published item. It is usually good practice to closeExceptionally in these cases.
Method consume(Consumer) simplifies support for a common case in which the only action of a subscriber is to request and process all items using a supplied function.
This class may also serve as a convenient base for subclasses that generate items, and use the methods in this class to publish them. For example here is a class that periodically publishes the items generated from a supplier. (In practice you might add methods to independently start and stop generation, to share Executors among publishers, and so on, or use a SubmissionPublisher as a component rather than a superclass.)
class PeriodicPublisher<T> extends SubmissionPublisher<T> {
final ScheduledFuture<?> periodicTask;
final ScheduledExecutorService scheduler;
PeriodicPublisher(Executor executor, int maxBufferCapacity,
Supplier<? extends T> supplier,
long period, TimeUnit unit) {
super(executor, maxBufferCapacity);
scheduler = new ScheduledThreadPoolExecutor(1);
periodicTask = scheduler.scheduleAtFixedRate(
() -> submit(supplier.get()), 0, period, unit);
}
public void close() {
periodicTask.cancel(false);
scheduler.shutdown();
super.close();
}
}
Here is an example of a Flow.Processor implementation. It uses single-step requests to its publisher for simplicity of illustration. A more adaptive version could monitor flow using the lag estimate returned from submit, along with other utility methods.
class TransformProcessor<S,T> extends SubmissionPublisher<T>
implements Flow.Processor<S,T> {
final Function<? super S, ? extends T> function;
Flow.Subscription subscription;
TransformProcessor(Executor executor, int maxBufferCapacity,
Function<? super S, ? extends T> function) {
super(executor, maxBufferCapacity);
this.function = function;
}
public void onSubscribe(Flow.Subscription subscription) {
(this.subscription = subscription).request(1);
}
public void onNext(S item) {
subscription.request(1);
submit(function.apply(item));
}
public void onError(Throwable ex) { closeExceptionally(ex); }
public void onComplete() { close(); }
}
Constructor Summary |
||
Creates a new SubmissionPublisher using the ForkJoinPool.commonPool() for async delivery to subscribers
(unless it does not support a parallelism level of at least two,
in which case, a new Thread is created to run each task), with
maximum buffer capacity of Flow.defaultBufferSize(), and no
handler for Subscriber exceptions in method onNext.
|
||
SubmissionPublisher(Executor executor, int maxBufferCapacity)
Creates a new SubmissionPublisher using the given Executor for
async delivery to subscribers, with the given maximum buffer size
for each subscriber, and no handler for Subscriber exceptions in
method onNext.
|
||
SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
Creates a new SubmissionPublisher using the given Executor for
async delivery to subscribers, with the given maximum buffer size
for each subscriber, and, if non-null, the given handler invoked
when any Subscriber throws an exception in method onNext.
|
Method Summary |
||
void |
close()
Unless already closed, issues onComplete signals to current
subscribers, and disallows subsequent attempts to publish.
|
|
void |
closeExceptionally(Throwable error)
Unless already closed, issues onError signals to current
subscribers with the given error, and disallows subsequent
attempts to publish.
|
|
Processes all published items using the given Consumer function.
|
||
int |
Returns an estimate of the maximum number of items produced but
not yet consumed among all current subscribers.
|
|
long |
Returns an estimate of the minimum number of items requested
(via request) but not
yet produced, among all current subscribers.
|
|
Returns the exception associated with closeExceptionally, or null if
not closed or if closed normally.
|
||
Returns the Executor used for asynchronous delivery.
|
||
int |
Returns the maximum per-subscriber buffer capacity.
|
|
int |
Returns the number of current subscribers.
|
|
Returns a list of current subscribers for monitoring and
tracking purposes, not for invoking Flow.Subscriber
methods on the subscribers.
|
||
boolean |
Returns true if this publisher has any subscribers.
|
|
boolean |
isClosed()
Returns true if this publisher is not accepting submissions.
|
|
boolean |
Returns true if the given Subscriber is currently subscribed.
|
|
int |
Publishes the given item, if possible, to each current subscriber
by asynchronously invoking its onNext method.
|
|
int |
offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
Publishes the given item, if possible, to each current subscriber
by asynchronously invoking its onNext method, blocking while
resources for any subscription are unavailable, up to the
specified timeout or until the caller thread is interrupted, at
which point the given handler (if non-null) is invoked, and if it
returns true, retried once.
|
|
int |
Publishes the given item to each current subscriber by
asynchronously invoking its onNext method, blocking uninterruptibly while resources for any
subscriber are unavailable.
|
|
void |
Adds the given Subscriber unless already subscribed.
|
Methods inherited from class java.lang.Object |
public SubmissionPublisher |
(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler) |
public SubmissionPublisher |
(Executor executor, int maxBufferCapacity) |
public SubmissionPublisher |
() |
public void subscribe |
(Flow.Subscriber<? super T> subscriber) |
public int submit |
(T item) |
If the Executor for this publisher throws a RejectedExecutionException (or any other RuntimeException or Error) when attempting to asynchronously notify subscribers, then this exception is rethrown, in which case not all subscribers will have been issued this item.
public int offer |
This method returns a status indicator: If negative, it represents the (negative) number of drops (failed attempts to issue the item to a subscriber). Otherwise it is an estimate of the maximum lag (number of items submitted but not yet consumed) among all current subscribers. This value is at least one (accounting for this submitted item) if there are any subscribers, else zero.
If the Executor for this publisher throws a RejectedExecutionException (or any other RuntimeException or Error) when attempting to asynchronously notify subscribers, or the drop handler throws an exception when processing a dropped item, then this exception is rethrown.
public int offer |
This method returns a status indicator: If negative, it represents the (negative) number of drops (failed attempts to issue the item to a subscriber). Otherwise it is an estimate of the maximum lag (number of items submitted but not yet consumed) among all current subscribers. This value is at least one (accounting for this submitted item) if there are any subscribers, else zero.
If the Executor for this publisher throws a RejectedExecutionException (or any other RuntimeException or Error) when attempting to asynchronously notify subscribers, or the drop handler throws an exception when processing a dropped item, then this exception is rethrown.
public void close |
() |
public void closeExceptionally |
(Throwable error) |
public boolean isClosed |
() |
public Throwable getClosedException |
() |
public boolean hasSubscribers |
() |
public int getNumberOfSubscribers |
() |
public Executor getExecutor |
() |
public int getMaxBufferCapacity |
() |
() |
public boolean isSubscribed |
(Flow.Subscriber<? super T> subscriber) |
public long estimateMinimumDemand |
() |
public int estimateMaximumLag |
() |
|
FlexDoc/Javadoc 2.0 Demo Java Doc |