Hello people!I have a DataStream, which has events with with a continuing number which signifies their belonging to a production cycle. In essence, this is what the data looks like:value, production cycle12.0, 200012.3, 2000 one production cylce12.2, 20000.0, 20010.4, 2002 another production cycle1.1, 200255.0, 200360.0, 2003 another production cycle70.0, 2003I have to do some calculations over the events of each production cycle. I want to use Flink's window API for that. This is how I'm doing it right now:DataStream<String> test = streamExecEnv.readTextFile("C:/Projects/Python/testdata.txt").map(new ImaginePaperDataConverterTest()) // convert data to POJO.assignTimestampsAndWatermarks(new ImaginePaperAssigner()) // Assign timestamps for event time.keyBy((ImaginePaperData event) -> event.lunum) //<- the production cycle number.window(GlobalWindows.create()) // create global window.trigger(new LunumTrigger()) // "split" the window with a custom trigger.process(new ImaginePaperWindowReportFunction()); // apply a function over the aggregated eventsI'm getting a "DataStream" out of a text file, just for testing purposes. The problem is that what I'm doing only aggregates one single event for a production cycle. Why is that? I thought keying the stream by the production cycle number already partitions the stream anyways. The trigger says when the production cycle number is changed, a new global window is started and the events of the current window are aggregated. What am I missing here?Just to be safe, here is my implementation of the custom trigger:public class LunumTrigger extends Trigger<ImaginePaperData, GlobalWindow> {private static final long serialVersionUID = 1L;public LunumTrigger() {}private final ValueStateDescriptor<Integer> prevLunum = new ValueStateDescriptor<>("lunum", Integer.class);@Overridepublic TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {ValueState<Integer> lunumState = ctx.getPartitionedState(prevLunum);if (lunumState.value() == null || !(element.lunum.equals(lunumState.value()))) {System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> FIRE!");lunumState.update(element.lunum);return TriggerResult.FIRE_AND_PURGE;}System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> HOLD!");lunumState.update(element.lunum);return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(GlobalWindow window, TriggerContext ctx) throws Exception {ctx.getPartitionedState(prevLunum).clear();}}I'm very grateful for your help.Regards,Daniel
Free forum by Nabble | Edit this page |