Cascade.java
/*
* Copyright 2017 Mackenzie High
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mackenziehigh.cascade;
import com.mackenziehigh.cascade.Cascade.Stage.Actor.Builder;
import com.mackenziehigh.cascade.Cascade.Stage.Actor.ConsumerErrorHandler;
import com.mackenziehigh.cascade.Cascade.Stage.Actor.Context;
import com.mackenziehigh.cascade.Cascade.Stage.Actor.ContextErrorHandler;
import com.mackenziehigh.cascade.Cascade.Stage.Actor.ContextScript;
import com.mackenziehigh.cascade.Cascade.Stage.Actor.Mailbox;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Micro Actor Framework.
*/
public interface Cascade
{
/**
* A group of <code>Actor</code>s with a common power supply.
*/
public interface Stage
{
/**
* Actor.
*
* @param <I> is the type of messages the actor will consume.
* @param <O> is the type of messages the actor will produce.
*/
public interface Actor<I, O>
{
/**
* Actor Builder.
*
* @param <I> is the type of messages the actor will consume.
* @param <O> is the type of messages the actor will produce.
*/
public interface Builder<I, O>
{
/**
* Define the normal behavior of the actor.
*
* <p>
* If a script was already defined, then the given
* script will replace the previously defined one.
* </p>
*
* <p>
* If the only instance of the script is held by a single actor,
* then the script will only ever handle one exception at a time.
* Thus, the code contained in the script is intrinsically thread-safe.
* </p>
*
* <p>
* <b>Warning:</b> If two actors share the same script object,
* then that script may be executed concurrently by the independent
* actors in order to process messages received independently by each.
* Thus, in that case, the script is <b>not</b> intrinsically thread-safe.
* </p>
*
* @param <X> is the type of messages the actor will consume.
* @param <Y> is the type of messages the actor will produce.
* @param script defines the message-handling behavior of the actor.
* @return a modified copy of this builder.
*/
public <X, Y> Builder<X, Y> withContextScript (ContextScript<X, Y> script);
/**
* Define the normal behavior of the actor.
*
* <p>
* If a script was already defined, then the given
* script will replace the previously defined one.
* </p>
*
* <p>
* If the only instance of the script is held by a single actor,
* then the script will only ever handle one exception at a time.
* Thus, the code contained in the script is intrinsically thread-safe.
* </p>
*
* <p>
* <b>Warning:</b> If two actors share the same script object,
* then that script may be executed concurrently by the independent
* actors in order to process messages received independently by each.
* Thus, in that case, the script is <b>not</b> intrinsically thread-safe.
* </p>
*
* @param <X> is the type of messages the actor will consume.
* @param <Y> is the type of messages the actor will produce.
* @param script defines the message-handling behavior of the actor.
* @return a modified copy of this builder.
*/
public default <X, Y> Builder<X, Y> withFunctionScript (FunctionScript<X, Y> script)
{
return withContextScript((ctx, input) ->
{
ctx.sendFrom(script.onInput(input));
});
}
/**
* Define the normal behavior of the actor.
*
* <p>
* If a script was already defined, then the given
* script will replace the previously defined one.
* </p>
*
* <p>
* If the only instance of the script is held by a single actor,
* then the script will only ever handle one exception at a time.
* Thus, the code contained in the script is intrinsically thread-safe.
* </p>
*
* <p>
* <b>Warning:</b> If two actors share the same script object,
* then that script may be executed concurrently by the independent
* actors in order to process messages received independently by each.
* Thus, in that case, the script is <b>not</b> intrinsically thread-safe.
* </p>
*
* @param <X> is the type of messages the actor will consume.
* @param script defines the message-handling behavior of the actor.
* @return a modified copy of this builder.
*/
public default <X> Builder<X, X> withConsumerScript (ConsumerScript<X> script)
{
return withFunctionScript(x ->
{
script.onInput(x);
return null;
});
}
/**
* Define how the actor responds to unhandled exceptions.
*
* <p>
* If an error-handler was already defined, then that error-handler
* and the given error-handler will be composed to form a new (third)
* error-handler that executes both error-handlers in sequence.
* In effect, this method appends the given error-handler onto
* the list of error-handlers that the actor will use.
* When an unhandled exception occurs, all of the handlers will execute.
* </p>
*
* <p>
* If the only instance of the error-handler is held by a single actor,
* then the error-handler will only ever handle one exception at a time.
* Thus, the code contained in the error-handler is intrinsically thread-safe.
* </p>
*
* <p>
* <b>Warning:</b> If two actors share the same error-handler object,
* then that error-handler may be executed concurrently by
* the independent actors in order to handle distinct exceptions.
* Thus, in that case, the error-handler is <b>not</b> intrinsically thread-safe.
* </p>
*
* <p>
* If the error-handler itself throws an exception,
* then that exception will be silently dropped.
* </p>
*
* @param handler defines the error-handling behavior of the actor.
* @return a modified copy of this builder.
*/
public Builder<I, O> withContextErrorHandler (ContextErrorHandler<I, O> handler);
/**
* Define how the actor responds to unhandled exceptions.
*
* <p>
* Equivalent: <code>withContextErrorHandler((context, message, cause) -> handler.onError(cause))</code>
* </p>
*
* @param handler defines the error-handling behavior of the actor.
* @return a modified copy of this builder.
*/
public default Builder<I, O> withConsumerErrorHandler (final ConsumerErrorHandler handler)
{
return withContextErrorHandler((context, message, cause) -> handler.onError(cause));
}
/**
* Cause the actor to use the given mailbox to store incoming messages.
*
* <p>
* <b>Warning:</b> The mailbox must ensure thread-safety.
* </p>
*
* @param queue will store incoming messages as they await processing.
* @return a modified copy of this builder.
*/
public Builder<I, O> withMailbox (Mailbox<I> queue);
/**
* Construct the actor and add it to the stage.
*
* @return the newly created actor.
*/
public Actor<I, O> create ();
}
/**
* A queue-like (FIFO) data-structure that stores incoming messages.
*
* @param <I> is the type of messages that the actor will consume.
*/
public interface Mailbox<I>
{
/**
* Add a message to the mailbox.
*
* @param message will be added to the mailbox, if possible.
* @return true, only if the message was in-fact added to the mailbox.
*/
public boolean offer (I message);
/**
* Remove a message from the mailbox.
*
* <p>
* If any message is in the mailbox, then this method <b>must</b> return non-null.
* In other words, a mailbox <b>cannot</b> choose to delay the removal
* of a message by forcing the caller to call this method again at a later time.
* </p>
*
* <p>
* A mailbox can choose to unilaterally drop messages.
* In other words, a message that was successfully <code>offer()</code>-ed
* to this mailbox may never be returned by <code>poll()</code>,
* at the sole discretion of the mailbox itself.
* </p>
*
* @return the message that was removed, or null, if no message was available.
*/
public I poll ();
}
/**
* Input to an Actor.
*
* @param <T> is the type of messages that the actor will consume.
*/
public interface Input<T>
{
/**
* Get the actor that this input pertains to.
*
* @return the enclosing actor.
*/
public Actor<T, ?> actor ();
/**
* Connect this input to the given output of another actor.
*
* <p>
* This method is a no-op, if the connection already exists.
* </p>
*
* <p>
* Implementations should <b>not</b> override the default
* behavior of this method as defined in this interface.
* </p>
*
* @param output will send messages to this input.
* @return this.
*/
public default Input<T> connect (final Output<T> output)
{
Objects.requireNonNull(output, "output");
output.connect(this);
return this;
}
/**
* Disconnect this input from the given output.
*
* <p>
* This method is a no-op, if the connection does not exist.
* </p>
*
* <p>
* Implementations should <b>not</b> override the default
* behavior of this method as defined in this interface.
* </p>
*
* @param output will no longer be connected.
* @return this.
*/
public default Input<T> disconnect (final Output<T> output)
{
Objects.requireNonNull(output, "output");
output.disconnect(this);
return this;
}
/**
* Determine whether this input is connected to the given output.
*
* <p>
* Implementations should <b>not</b> override the default
* behavior of this method as defined in this interface.
* </p>
*
* @param output may be connected to this input.
* @return true, if this input is currently connected to the output.
*/
public default boolean isConnected (final Output<?> output)
{
return output.isConnected(this);
}
/**
* Send a message to the actor via this input, silently dropping the message,
* if this input does not have sufficient capacity to enqueue the message.
*
* <p>
* Equivalent: <code>return actor().context().offerTo(message);</code>
* </p>
*
* @param message will be processed by the actor, eventually,
* if the message is not dropped due to capacity restrictions.
* @return true, if the message was successfully added to the underlying mailbox.
* @throws NullPointerException if the <code>message</code> is null.
*/
public default boolean offer (T message)
{
return actor().context().offerTo(message);
}
/**
* Send a message to the actor via this input.
*
* <p>
* Equivalent: <code>offer(message); return this;</code>
* </p>
*
* @param message will be processed by the actor, eventually,
* if the message was not dropped due to capacity restrictions.
* @return this.
*/
public default Input<T> send (final T message)
{
offer(message);
return this;
}
}
/**
* Output to an Actor.
*
* @param <T> is the type of messages that the actor will produce.
*/
public interface Output<T>
{
/**
* Get the actor that this input pertains to.
*
* @return the enclosing actor.
*/
public Actor<?, T> actor ();
/**
* Connect this output to the given input of another actor.
*
* <p>
* This method is a no-op, if the connection already exists.
* </p>
*
* <p>
* When implementing this method, care must be taken to ensure
* that concurrent connections and/or disconnection do not
* lead to incorrect states, such as duplicate connections.
* </p>
*
* @param input will be sent messages from this output.
* @return this.
*/
public Output<T> connect (Input<T> input);
/**
* Disconnect this output from the given input.
*
* <p>
* This method is a no-op, if the connection does not exist.
* </p>
*
* <p>
* When implementing this method, care must be taken to ensure
* that concurrent connections and/or disconnection do not
* lead to incorrect states, such as duplicate connections.
* </p>
*
* @param input will no longer be connected.
* @return this.
*/
public Output<T> disconnect (Input<T> input);
/**
* Determine whether this output is connected to the given input.
*
* @param input may be connected to this output.
* @return true, if this output is currently connected to the input.
*/
public boolean isConnected (final Input<?> input);
}
/**
* Script Execution Context.
*
* @param <I> is the type of messages that the enclosing actor consumes.
* @param <O> is the type of messages that the enclosing actor produces.
*/
public interface Context<I, O>
{
/**
* Get the enclosing actor.
*
* @return the actor that owns this context.
*/
public Actor<I, O> actor ();
/**
* Offer a message <b>to</b> the enclosing actor.
*
* @param message is the message to send to the actor.
* @return true, if the message was successfully added to the underlying mailbox.
*/
public boolean offerTo (I message);
/**
* Offer a message <b>from</b> the enclosing actor.
*
* @param message is the message to send from the actor.
* @return true, if the message was sent to every connected output.
*/
public boolean offerFrom (O message);
/**
* Send a message <b>to</b> the enclosing actor.
*
* @param message is the message to send to the actor.
* @return this.
*/
public default Context<I, O> sendTo (final I message)
{
offerTo(message);
return this;
}
/**
* Send a message <b>from</b> the enclosing actor.
*
* @param message is the message to send from the actor.
* @return this.
*/
public default Context<I, O> sendFrom (final O message)
{
offerFrom(message);
return this;
}
}
/**
* Actor Behavior.
*
* @param <I> is the type of messages that the actor will consume.
* @param <O> is the type of messages that the actor will produce.
*/
@FunctionalInterface
public interface ContextScript<I, O>
{
/**
* This method will be invoked by the enclosing actor
* in order to process all incoming messages.
*
* @param context can be used to send messages from the actor, etc.
* @param input is being processed by the actor using this script.
* @throws Throwable or a sub-class thereof, at the discretion of the implementation.
*/
public void onInput (Context<I, O> context,
I input)
throws Throwable;
}
/**
* Actor Behavior.
*
* @param <I> is the type of messages that the actor will consume.
* @param <O> is the type of messages that the actor will produce.
*/
@FunctionalInterface
public interface FunctionScript<I, O>
{
/**
* This method will be invoked by the enclosing actor
* in order to process all incoming messages.
*
* @param input is being processed by the actor using this script.
* @return the output message to send from the actor, or null,
* if the actor shall not produce an output for the given input.
* @throws Throwable or a sub-class thereof, at the discretion of the implementation.
*/
public O onInput (I input)
throws Throwable;
}
/**
* Actor Behavior.
*
* @param <I> is the type of messages that the actor will consume.
*/
@FunctionalInterface
public interface ConsumerScript<I>
{
/**
* This method will be invoked by the enclosing actor
* in order to process all incoming messages.
*
* @param input is being processed by the actor using this script.
* @throws Throwable or a sub-class thereof, at the discretion of the implementation.
*/
public void onInput (I input)
throws Throwable;
}
/**
* Actor Error Handler.
*
* @param <I> is the type of messages that the actor will consume.
* @param <O> is the type of messages that the actor will produce.
*/
@FunctionalInterface
public interface ContextErrorHandler<I, O>
{
/**
* This method will be invoked by the enclosing actor in order to
* handle any unhandled exceptions that are thrown by the script.
*
* <p>
* The <code>message</code> is not available, in particular,
* if the exception occurred due to a <code>Mailbox.poll()</code>.
* </p>
*
* @param context can be used to send messages from the actor, etc.
* @param message was being processed when the exception occurred, if available.
* @param cause was thrown by the script and unhandled elsewhere.
* @throws Throwable if something goes unexpectedly wrong.
*/
public void onError (Context<I, O> context,
I message,
Throwable cause)
throws Throwable;
/**
* Compose this script within another script, such that any
* exceptions thrown by this script will be silently ignored.
*
* @return the new script that contains this script.
*/
public default ContextErrorHandler<I, O> silent ()
{
return (context, message, cause) ->
{
try
{
onError(context, message, cause);
}
catch (Throwable ex)
{
// Pass.
}
};
}
/**
* Compose this script and the given script into a single script.
*
* <p>
* If either script throws an exception, then the exception will be silently dropped.
* </p>
*
* @param after will come after this script inside of the new script.
* @return the new script.
*/
public default ContextErrorHandler<I, O> andThen (final ContextErrorHandler<I, O> after)
{
final ContextErrorHandler<I, O> first = silent();
final ContextErrorHandler<I, O> second = after.silent();
return (context, message, cause) ->
{
first.onError(context, message, cause);
second.onError(context, message, cause);
};
}
/**
* Compose this script and the given script into a single script.
*
* <p>
* If either script throws an exception, then the exception will be silently dropped.
* </p>
*
* @param after will come after this script inside of the new script.
* @return the new script.
*/
public default ContextErrorHandler<I, O> andThen (final ConsumerErrorHandler after)
{
final ContextErrorHandler<I, O> first = silent();
final ConsumerErrorHandler second = after.silent();
return (context, message, cause) ->
{
first.onError(context, message, cause);
second.onError(cause);
};
}
}
/**
* Actor Error Handler.
*/
@FunctionalInterface
public interface ConsumerErrorHandler
{
/**
* This method will be invoked by the enclosing actor in order to
* handle any unhandled exceptions that are thrown by the script.
*
* @param cause was thrown by the script and unhandled elsewhere.
* @throws Throwable if something goes unexpectedly wrong.
*/
public void onError (Throwable cause)
throws Throwable;
/**
* Compose this script within another script, such that any
* exceptions thrown by this script will be silently ignored.
*
* @return the new script that contains this script.
*/
public default ConsumerErrorHandler silent ()
{
return (cause) ->
{
try
{
onError(cause);
}
catch (Throwable ex)
{
// Pass.
}
};
}
/**
* Compose this script and the given script into a single script.
*
* <p>
* If either script throws an exception, then the exception will be silently dropped.
* </p>
*
* @param after will come after this script inside of the new script.
* @return the new script.
*/
public default ConsumerErrorHandler andThen (final ConsumerErrorHandler after)
{
final ConsumerErrorHandler first = silent();
final ConsumerErrorHandler second = after.silent();
return (cause) ->
{
first.onError(cause);
second.onError(cause);
};
}
}
/**
* Get the <code>Stage</code> that contains this actor.
*
* @return the enclosing stage.
*/
public Stage stage ();
/**
* Get the <code>Context</code> that is passed into <code>ContextScript</code>s.
*
* @return the context used when executing scripts.
*/
public Context<I, O> context ();
/**
* Get the <code>Input</code> that supplies messages to this actor.
*
* @return the input to the actor.
*/
public Input<I> input ();
/**
* Get the <code>Output</code> that receives messages from this actor.
*
* @return the output from the actor.
*/
public Output<O> output ();
}
/**
* Create a builder that can be used to add a new actor to this stage.
*
* <p>
* This method returns a builder, rather than an actor itself,
* so that further configuration of the actor can be performed,
* if the calling code so desires.
* </p>
*
* @param <I> is the type of messages that the actor will consume.
* @param <O> is the type of messages that the actor will produce.
* @return the new builder.
*/
public <I, O> Actor.Builder<I, O> newActor ();
/**
* Asynchronously shutdown this stage, as soon as reasonably possible.
*
* <p>
* Subsequent invocations of this method are idempotent.
* </p>
*/
public void close ();
}
/**
* A <code>Mailbox</code> implementation based on a <code>ConcurrentLinkedQueue</code>.
*
* @param <I> is the type of messages that will be stored in the mailbox.
*/
public static final class ConcurrentLinkedQueueMailbox<I>
implements Mailbox<I>
{
private final ConcurrentLinkedQueue<I> queue;
private ConcurrentLinkedQueueMailbox (final ConcurrentLinkedQueue<I> queue)
{
this.queue = queue;
}
/**
* Create a new mailbox.
*
* @param <I> is the type of messages that will be stored in the mailbox.
* @return the new mailbox.
*/
public static <I> Mailbox<I> create ()
{
return new ConcurrentLinkedQueueMailbox<>(new ConcurrentLinkedQueue<>());
}
/**
* {@inheritDoc}
*/
@Override
public boolean offer (final I message)
{
return queue.offer(message);
}
/**
* {@inheritDoc}
*/
@Override
public I poll ()
{
return queue.poll();
}
}
/**
* A <code>Mailbox</code> implementation based on a <code>LinkedBlockingQueue</code>.
*
* @param <I> is the type of messages that will be stored in the mailbox.
*/
public static final class LinkedBlockingQueueMailbox<I>
implements Mailbox<I>
{
private final LinkedBlockingQueue<I> queue;
private LinkedBlockingQueueMailbox (final LinkedBlockingQueue<I> queue)
{
this.queue = queue;
}
/**
* Create a new mailbox.
*
* @param <I> is the type of messages that will be stored in the mailbox.
* @return the new mailbox.
*/
public static <I> Mailbox<I> create ()
{
return new LinkedBlockingQueueMailbox<>(new LinkedBlockingQueue<>());
}
/**
* Create a new mailbox.
*
* @param <I> is the type of messages that will be stored in the mailbox.
* @param capacity is the maximum number of messages that can be stored simultaneously.
* @return the new mailbox.
*/
public static <I> Mailbox<I> create (final int capacity)
{
return new LinkedBlockingQueueMailbox<>(new LinkedBlockingQueue<>(capacity));
}
/**
* {@inheritDoc}
*/
@Override
public boolean offer (final I message)
{
return queue.offer(message);
}
/**
* {@inheritDoc}
*/
@Override
public I poll ()
{
return queue.poll();
}
}
/**
* A <code>Mailbox</code> implementation based on a <code>ArrayBlockingQueue</code>.
*
* @param <I> is the type of messages that will be stored in the mailbox.
*/
public static final class ArrayBlockingQueueMailbox<I>
implements Mailbox<I>
{
private final ArrayBlockingQueue<I> queue;
private ArrayBlockingQueueMailbox (final ArrayBlockingQueue<I> queue)
{
this.queue = queue;
}
/**
* Create a new mailbox.
*
* @param <I> is the type of messages that will be stored in the mailbox.
* @param capacity is the maximum number of messages that can be stored simultaneously.
* @return the new mailbox.
*/
public static <I> Mailbox<I> create (final int capacity)
{
return new ArrayBlockingQueueMailbox<>(new ArrayBlockingQueue<>(capacity));
}
/**
* {@inheritDoc}
*/
@Override
public boolean offer (final I message)
{
return queue.offer(message);
}
/**
* {@inheritDoc}
*/
@Override
public I poll ()
{
return queue.poll();
}
}
/**
* A <code>Mailbox</code> implementation based on a <code>ArrayDeque</code>.
*
* @param <I> is the type of messages that will be stored in the mailbox.
*/
public static final class ArrayDequeMailbox<I>
implements Mailbox<I>
{
private final ArrayDeque<I> queue;
private final int capacity;
private ArrayDequeMailbox (final ArrayDeque<I> queue,
final int capacity)
{
this.queue = queue;
this.capacity = capacity;
}
/**
* Create a new mailbox.
*
* @param <I> is the type of messages that will be stored in the mailbox.
* @param initial is the initial size of the backing data-structure.
* @param capacity is the maximum number of messages that can be stored simultaneously.
* @return the new mailbox.
*/
public static <I> Mailbox<I> create (final int initial,
final int capacity)
{
return new ArrayDequeMailbox<>(new ArrayDeque<>(initial), capacity);
}
/**
* {@inheritDoc}
*
* <p>
* Notice that this method is synchronized.
* </p>
*/
@Override
public synchronized boolean offer (final I message)
{
if (queue.size() == capacity)
{
return false;
}
else
{
return queue.offer(message);
}
}
/**
* {@inheritDoc}
*
* <p>
* Notice that this method is synchronized.
* </p>
*/
@Override
public synchronized I poll ()
{
return queue.poll();
}
}
/**
* A <code>Mailbox</code> implementation based on a <code>PriorityBlockingQueue</code>.
*
* @param <I> is the type of messages that will be stored in the mailbox.
*/
public static final class PriorityBlockingQueueMailbox<I>
implements Mailbox<I>
{
private final PriorityBlockingQueue<I> queue;
private PriorityBlockingQueueMailbox (final PriorityBlockingQueue<I> queue)
{
this.queue = queue;
}
/**
* Create a new mailbox.
*
* @param <I> is the type of messages that will be stored in the mailbox.
* @param initial is the initial size of the backing data-structure.
* @param ordering assigns priorities to messages in the mailbox.
* @return the new mailbox.
*/
public static <I> Mailbox<I> create (final int initial,
final Comparator<I> ordering)
{
return new PriorityBlockingQueueMailbox<>(new PriorityBlockingQueue<>(initial, ordering));
}
/**
* {@inheritDoc}
*/
@Override
public boolean offer (final I message)
{
return queue.offer(message);
}
/**
* {@inheritDoc}
*/
@Override
public I poll ()
{
return queue.poll();
}
}
/**
* A <code>Mailbox</code> implementation based on an <code>ArrayDeque</code>,
* which behaves like a ring-buffer data-structure.
*
* @param <I> is the type of messages that will be stored in the mailbox.
*/
public static final class CircularArrayDequeMailbox<I>
implements Mailbox<I>
{
private final ArrayDeque<I> queue;
private final int capacity;
private CircularArrayDequeMailbox (final ArrayDeque<I> queue,
final int capacity)
{
this.queue = queue;
this.capacity = capacity;
}
/**
* Create a new mailbox.
*
* @param <I> is the type of messages that will be stored in the mailbox.
* @param initial is the initial size of the backing data-structure.
* @param capacity is the maximum number of messages that can be stored simultaneously.
* @return the new mailbox.
*/
public static <I> Mailbox<I> create (final int initial,
final int capacity)
{
return new CircularArrayDequeMailbox<>(new ArrayDeque<>(initial), capacity);
}
/**
* {@inheritDoc}
*
* <p>
* Notice that this method is synchronized.
* </p>
*/
@Override
public synchronized boolean offer (final I message)
{
if (queue.size() == capacity)
{
queue.poll();
queue.offer(message);
return true;
}
else
{
queue.offer(message);
return true;
}
}
/**
* {@inheritDoc}
*
* <p>
* Notice that this method is synchronized.
* </p>
*/
@Override
public synchronized I poll ()
{
return queue.poll();
}
}
/**
* Partial Implementation of <code>Stage</code>.
*/
public static abstract class AbstractStage
implements Cascade.Stage
{
private final Stage STAGE = this;
private final AtomicBoolean stageClosed = new AtomicBoolean(false);
/**
* This method will be invoked whenever an actor needs executed.
*
* <p>
* This method will not be re-invoked, until the actor finishes
* being executed, even if the actor determines that it needs
* to be executed again. Rather, the actor will invoke this
* method again, if needed, at the end of its execution.
* This strategy helps ensure that no two threads will
* ever power the actor concurrently. Moreover,
* this strategy lessons the amount of memory used
* by some implementations in order to schedule actors.
* </p>
*
* <p>
* Implementations of this method should never throw exceptions.
* If an exception or error is thrown, then the stage will be closed.
* </p>
*
* @param actor needs to be <code>run()</code> at some point in the future.
*/
protected abstract void onRunnable (DefaultActor<?, ?> actor);
/**
* This method will be invoked when this stage closes.
*/
protected abstract void onClose ();
/**
* {@inheritDoc}
*/
@Override
public final <I, O> Actor.Builder<I, O> newActor ()
{
return new DefaultActorBuilder<>();
}
/**
* {@inheritDoc}
*/
@Override
public final void close ()
{
if (stageClosed.compareAndSet(false, true))
{
onClose();
}
}
/**
* This method protects against exceptions thrown in the overridden <code>onSubmit()</code> method.
* If an exception is thrown in that method, then the stage must be shutdown,
* since we would be unable to ensure that all pending tasks get executed.
*
* @param actor needs scheduled for execution.
*/
private void safelySchedule (final DefaultActor<?, ?> actor)
{
try
{
onRunnable(actor);
}
catch (Throwable ex)
{
close();
}
}
/**
* Default Implementation of the <code>Actor.Builder</code> interface.
*
* @param <I> is the type of messages that the actor will consume.
* @param <O> is the type of messages that the actor will produce.
*/
private final class DefaultActorBuilder<I, O>
implements Cascade.Stage.Actor.Builder<I, O>
{
private final Mailbox<I> mailbox;
private final ContextScript<I, O> script;
private final ContextErrorHandler<I, O> errorHandler;
private DefaultActorBuilder ()
{
this.mailbox = ConcurrentLinkedQueueMailbox.create();
this.script = (context, message) ->
{
// Pass.
};
this.errorHandler = (context, message, cause) ->
{
// Pass.
};
}
private DefaultActorBuilder (final Mailbox<I> mailbox,
final ContextScript<I, O> script,
final ContextErrorHandler<I, O> errorHandler)
{
this.mailbox = mailbox;
this.script = script;
this.errorHandler = errorHandler;
}
@Override
public <X, Y> Actor.Builder<X, Y> withContextScript (final Stage.Actor.ContextScript<X, Y> script)
{
Objects.requireNonNull(script, "script");
return new DefaultActorBuilder(mailbox, script, errorHandler);
}
@Override
public Actor.Builder<I, O> withContextErrorHandler (final ContextErrorHandler<I, O> handler)
{
Objects.requireNonNull(handler, "handler");
/**
* Combine the given handler and any previously defined handlers.
* Execute each of the handlers in sequence, even if one fails.
* If any handler throws an exception, simply ignore it.
* In general, an error-handler should not cause an error itself.
*/
final ContextErrorHandler<I, O> combined = errorHandler.andThen(handler);
return new DefaultActorBuilder(mailbox, script, combined);
}
@Override
public Actor.Builder<I, O> withMailbox (final Mailbox<I> mailbox)
{
Objects.requireNonNull(mailbox, "mailbox");
return new DefaultActorBuilder(mailbox, script, errorHandler);
}
@Override
public Actor<I, O> create ()
{
final DefaultActor<I, O> actor = new DefaultActor<>(this);
return actor;
}
}
/**
* Default Actor Implementation.
*
* <p>
* A (meta) object is stored herein, which is intended
* for use by implementing sub-classes, so that they
* can store actor specific information.
* </p>
*
* @param <I> is the type of the messages incoming to the actor.
* @param <O> is the type of the messages outgoing from the actor.
*/
public final class DefaultActor<I, O>
implements Cascade.Stage.Actor<I, O>,
Runnable
{
/**
* This reference just makes 'this' more explicit in
* order to avoid confusion caused by nested classes.
*/
private final DefaultActor<I, O> ACTOR = this;
/**
* This mailbox stores the backlog of messages that
* need to be processed by this actor one at a time.
*/
private final Mailbox<I> mailbox;
/**
* This script will be used to process those messages.
*/
private final ContextScript<I, O> script;
/**
* If that script throws an unhandled exception,
* then this error-handler will be invoked in
* order to handle the exception.
*/
private final ContextErrorHandler<I, O> errorHandler;
/**
* This object provides the ability to send messages to
* and from this actor and will be passed-in to the script.
*/
private final DefaultContext context = new DefaultContext();
/**
* This object provides the input-connector API and wraps the mailbox.
*/
private final DefaultInput input = new DefaultInput();
/**
* This object provides the output-connector API.
*/
private final DefaultOutput output = new DefaultOutput();
/**
* This is the number of messages that are in the mailbox.
*/
private final AtomicLong pendingCranks = new AtomicLong();
/**
* This flag is simply used as a sanity check to detect bugs,
* if the run() method is executed concurrently; therefore,
* this may be removed at some point in the future.
*/
private final AtomicBoolean inProgress = new AtomicBoolean(false);
/**
* This field can be used by custom stage implementations
* to store implementation-specific information.
*/
private volatile Object meta = null;
private DefaultActor (final DefaultActorBuilder<I, O> builder)
{
this.errorHandler = builder.errorHandler;
this.mailbox = builder.mailbox;
this.script = builder.script;
}
@Override
public void run ()
{
if (inProgress.compareAndSet(false, true) == false)
{
/**
* This should never actually happen, period; however, the likely cause is either:
* (1) the AbstractStage implementation called run() twice for one onRunnable() call,
* (2) the scheduling algorithm in this class is fundamentally broken.
* In the case of custom stages, case (1) is the most likely cause.
*/
throw new IllegalStateException("concurrent run()");
}
I message = null;
try
{
/**
* Pull the next message from the mailbox and
* then process the message using the script.
*/
message = mailbox.poll();
if (message != null)
{
script.onInput(context, message);
}
}
catch (Throwable cause)
{
/**
* Invoke the error-handler given the message and exception,
* but do not allow the error-handler to throw an exception.
* If the poll() threw the exception, then the message is null.
*/
handleException(message, cause);
}
finally
{
/**
* Now that the processing of the message is complete,
* go ahead and schedule the next message, if any.
*/
inProgress.set(false);
scheduleSubsequentMessage();
}
}
private void handleException (final I message,
final Throwable cause)
{
try
{
errorHandler.onError(context, message, cause);
}
catch (Throwable ignored)
{
// Pass, because errors from error-handlers cannot be reasonably handled.
}
}
private void scheduleInitialMessage ()
{
if (pendingCranks.incrementAndGet() == 1)
{
safelySchedule(ACTOR);
}
}
private void scheduleSubsequentMessage ()
{
if (pendingCranks.decrementAndGet() != 0)
{
safelySchedule(ACTOR);
}
}
@Override
public Stage stage ()
{
return STAGE;
}
@Override
public Context<I, O> context ()
{
return context;
}
@Override
public Input<I> input ()
{
return input;
}
@Override
public Output<O> output ()
{
return output;
}
public Object meta ()
{
return meta;
}
public void meta (final Object value)
{
meta = value;
}
private final class DefaultContext
implements Context<I, O>
{
@Override
public Actor<I, O> actor ()
{
return ACTOR;
}
@Override
public boolean offerFrom (final O message)
{
boolean sentToAll = true;
if (message != null)
{
final List<Input<O>> outputs = output.connectionList;
final int length = outputs.size();
// Using for instead of for-each avoids creating an iterator object.
for (int i = 0; i < length; i++)
{
sentToAll &= outputs.get(i).offer(message);
}
}
return sentToAll;
}
@Override
public boolean offerTo (final I message)
{
Objects.requireNonNull(message, "message");
if (mailbox.offer(message))
{
scheduleInitialMessage();
return true;
}
else
{
return false;
}
}
};
/**
* Default Implementation of <code>Actor.Input</code>.
*/
private final class DefaultInput
implements Actor.Input<I>
{
@Override
public Actor<I, ?> actor ()
{
return ACTOR;
}
}
/**
* Default Implementation of <code>Actor.Output</code>.
*/
private final class DefaultOutput
implements Actor.Output<O>
{
/**
* This lock is used to prevent concurrent connections and disconnections;
* however, this lock is not in the critical-path of message processing.
*/
private final Object outputLock = new Object();
/**
* This is an immutable list containing the inputs that this output is connected to.
*/
private volatile List<Input<O>> connectionList = newImmutableList(Collections.EMPTY_LIST);
@Override
public Actor<?, O> actor ()
{
return ACTOR;
}
@Override
public Output<O> connect (final Stage.Actor.Input<O> input)
{
Objects.requireNonNull(input, "input");
synchronized (outputLock)
{
if (isConnected(input) == false)
{
final List<Input<O>> modified = new ArrayList<>(connectionList);
modified.add(input);
connectionList = newImmutableList(modified);
}
}
return this;
}
@Override
public Output<O> disconnect (final Stage.Actor.Input<O> input)
{
Objects.requireNonNull(input, "input");
synchronized (outputLock)
{
if (isConnected(input))
{
final List<Input<O>> modified = new ArrayList<>(connectionList);
modified.remove(input);
connectionList = newImmutableList(modified);
}
}
return this;
}
@Override
public boolean isConnected (final Input<?> input)
{
return connectionList.contains(input);
}
}
}
private static <T> List<T> newImmutableList (final Collection<T> collection)
{
return List.copyOf(collection);
}
}
/**
* Create a new single-threaded stage.
*
* <p>
* The stage will use a non-daemon thread.
* </p>
*
* @return the new stage.
*/
public static Stage newStage ()
{
return newStage(1);
}
/**
* Create a new multi-threaded stage.
*
* <p>
* The stage will use non-daemon threads.
* </p>
*
* @param threadCount is the number of worker threads that the stage will use.
* @return the new stage.
*/
public static Stage newStage (final int threadCount)
{
return newStage(threadCount, false);
}
/**
* Create a new multi-threaded stage.
*
* @param threadCount is the number of worker threads that the stage will use.
* @param daemon is true, if the threads will be daemon threads.
* @return the new stage.
*/
public static Stage newStage (final int threadCount,
final boolean daemon)
{
final ThreadFactory factory = (Runnable task) ->
{
final Thread thread = new Thread(task);
thread.setDaemon(daemon);
return thread;
};
final ExecutorService service = Executors.newFixedThreadPool(threadCount, factory);
return newStage(service);
}
/**
* Create a new stage based on a given <code>ExecutorService</code>.
*
* @param service will power the new stage.
* @return the new stage.
*/
public static Stage newStage (final ExecutorService service)
{
Objects.requireNonNull(service, "service");
return new AbstractStage()
{
@Override
protected void onRunnable (final DefaultActor<?, ?> actor)
{
/**
* Schedule the actor to run at some point in the future.
* The actor itself, without blocking, will guarantee that
* it is only being run() by one thread at a time.
*/
service.execute(actor);
}
@Override
protected void onClose ()
{
service.shutdown();
}
};
}
}