Writing a Stage with a State
Although a stage should be stateless if possible, it sometimes needs to have a state.
What is a State?
Consider the example stage Counter below that counts the elements running from its input port to its output port. In such a situation, the stage needs to hold data across several executions. Here, the Counter holds a counter variable.
public final class Counter<T> extends AbstractConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort();
private int numElementsPassed;
@Override
protected void execute(final T element) {
this.numElementsPassed++;
outputPort.send(element);
}
/**
* <i>Hint:</i> This method may not be invoked by another thread since it is not thread-safe.
*/
public int getNumElementsPassed() {
return this.numElementsPassed;
}
public OutputPort<T> getOutputPort() {
return this.outputPort;
}
}
How to Define a State?
A state is represented by one or more instance fields and corresponding getter and setter methods within the stage.
Note: Each state attribute must be implemented in a thread-safe way if it is shared by multiple threads.