Hello people!
I have a DataStream, which has events with with a continuing number which signifies their belonging to a production cycle. In essence, this is what the data looks like: value, production cycle 12.0, 2000 12.3, 2000 one production cylce 12.2, 2000 0.0, 2001 0.4, 2002 another production cycle 1.1, 2002 55.0, 2003 60.0, 2003 another production cycle 70.0, 2003 I have to do some calculations over the events of each production cycle. I want to use Flink's window API for that. This is how I'm doing it right now: DataStream<String> test = streamExecEnv.readTextFile("C:/Projects/Python/testdata.txt") .map(new ImaginePaperDataConverterTest()) // convert data to POJO .assignTimestampsAndWatermarks(new ImaginePaperAssigner()) // Assign timestamps for event time .keyBy((ImaginePaperData event) -> event.lunum) //<- the production cycle number .window(GlobalWindows.create()) // create global window .trigger(new LunumTrigger()) // "split" the window with a custom trigger .process(new ImaginePaperWindowReportFunction()); // apply a function over the aggregated events I'm getting a "DataStream" out of a text file, just for testing purposes. The problem is that what I'm doing only aggregates one single event for a production cycle. Why is that? I thought keying the stream by the production cycle number already partitions the stream anyways. The trigger says when the production cycle number is changed, a new global window is started and the events of the current window are aggregated. What am I missing here? Just to be safe, here is my implementation of the custom trigger: public class LunumTrigger extends Trigger<ImaginePaperData, GlobalWindow> { private static final long serialVersionUID = 1L; public LunumTrigger() {} private final ValueStateDescriptor<Integer> prevLunum = new ValueStateDescriptor<>("lunum", Integer.class); @Override public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception { ValueState<Integer> lunumState = ctx.getPartitionedState(prevLunum); if (lunumState.value() == null || !(element.lunum.equals(lunumState.value()))) { System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> FIRE!"); lunumState.update(element.lunum); return TriggerResult.FIRE_AND_PURGE; } System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> HOLD!"); lunumState.update(element.lunum); return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(GlobalWindow window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(prevLunum).clear(); } } I'm very grateful for your help. Regards, Daniel |
Windowing and triggering on a keyed stream is done independently for each key. So for each key, your custom trigger is observing when the lunumState changes from null to a production cycle number, but it will never change again -- because only those stream elements with the same key will be processed in the context of that item of partitioned state. One advantage of windowing on keyed streams is the parallelism that's made possible by partitioning by key -- but in your case there's probably little to be gained, assuming the production cycles are sequential, rather than overlapping. You could proceed by (1) not keying the stream, (2) adapting ImaginePaperWindowReportFunction to only process events for the cycle that just ended (if necessary), and (3) writing a custom evictor to remove events once they've been reported on. On Tue, Jan 22, 2019 at 7:52 PM Daniel Krenn <[hidden email]> wrote:
David Anderson | Training Coordinator -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time |
Free forum by Nabble | Edit this page |