Write your own consumer with multiple output ports
For example, consider the InstanceOfFilter below. It relays the element from its default input port (got from its super class AbstractConsumerStage) to its self-declared output port if the element is an instance of the class represented by type. The logics is implemented in the execute() method.
public class InstanceOfFilter<I, O> extends AbstractConsumerStage<I> { private final OutputPort<O> matchingOutputPort = this.createOutputPort(); private final OutputPort<I> defaultOutputPort = this.createOutputPort(); private final Class<O> type; public InstanceOfFilter(final Class<O> type) { this.type = type; } @Override protected void execute(final I element) { if (this.type.isInstance(element)) { this.matchingOutputPort.send((O) element); } else { this.defaultOutputPort.send((I) element); } } public Class<O> getType() { return this.type; } public OutputPort<O> getMatchingOutputPort() { return this.matchingOutputPort; } public OutputPort<O> getDefaultOutputPort() { return this.defaultOutputPort; } }
The method execute is used to differentiate between incoming elements. Two different output ports are used to forward matching and non-matching elements. To send elements to a specific port, the method send(element) of each output port can be used.
The self-declared ports are represented as class fields. The method createNewOutputPort() creates new instances and registers them.
Furthermore, ports need to be accessible by the outer environment, for port connection actions i.e.. Therefore, the fields containing the ports need to be declared public, or methods need to be implemented which allow access to those fields. Such a getter for the input port is already provided by the super class.