GlobalWindow with custom tigger doesn't work correctly

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

GlobalWindow with custom tigger doesn't work correctly

Daniel Krenn
Hello people!

I have a DataStream, which has events with with a continuing number which signifies their belonging to a production cycle. In essence, this is what the data looks like:

value, production cycle
12.0, 2000
12.3, 2000 one production cylce
12.2, 2000

0.0, 2001
0.4, 2002 another production cycle
1.1, 2002

55.0, 2003
60.0, 2003 another production cycle
70.0, 2003

I have to do some calculations over the events of each production cycle. I want to use Flink's window API for that. This is how I'm doing it right now:

DataStream<String> test = streamExecEnv.readTextFile("C:/Projects/Python/testdata.txt")
    .map(new ImaginePaperDataConverterTest()) // convert data to POJO
    .assignTimestampsAndWatermarks(new ImaginePaperAssigner()) // Assign timestamps for event time
    .keyBy((ImaginePaperData event) -> event.lunum) //<- the production cycle number
    .window(GlobalWindows.create()) // create global window
    .trigger(new LunumTrigger()) // "split" the window with a custom trigger
    .process(new ImaginePaperWindowReportFunction()); // apply a function over the aggregated events

I'm getting a "DataStream" out of a text file, just for testing purposes. The problem is that what I'm doing only aggregates one single event for a production cycle. Why is that? I thought keying the stream by the production cycle number already partitions the stream anyways. The trigger says when the production cycle number is changed, a new global window is started and the events of the current window are aggregated. What am I missing here?
Just to be safe, here is my implementation of the custom trigger:

public class LunumTrigger extends Trigger<ImaginePaperData, GlobalWindow> {

private static final long serialVersionUID = 1L;

public LunumTrigger() {}

private final ValueStateDescriptor<Integer> prevLunum = new ValueStateDescriptor<>("lunum", Integer.class);

@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

ValueState<Integer> lunumState = ctx.getPartitionedState(prevLunum);

if (lunumState.value() == null || !(element.lunum.equals(lunumState.value()))) {
System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> FIRE!");
lunumState.update(element.lunum);
return TriggerResult.FIRE_AND_PURGE;
}

System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> HOLD!");
lunumState.update(element.lunum);
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(prevLunum).clear();
}
}

I'm very grateful for your help.

Regards,

Daniel

Reply | Threaded
Open this post in threaded view
|

Re: GlobalWindow with custom tigger doesn't work correctly

David Anderson
Windowing and triggering on a keyed stream is done independently for each key. So for each key, your custom trigger is observing when the lunumState changes from null to a production cycle number, but it will never change again -- because only those stream elements with the same key will be processed in the context of that item of partitioned state.

One advantage of windowing on keyed streams is the parallelism that's made possible by partitioning by key -- but in your case there's probably little to be gained, assuming the production cycles are sequential, rather than overlapping. You could proceed by (1) not keying the stream, (2) adapting ImaginePaperWindowReportFunction to only process events for the cycle that just ended (if necessary), and (3) writing a custom evictor to remove events once they've been reported on.

On Tue, Jan 22, 2019 at 7:52 PM Daniel Krenn <[hidden email]> wrote:
Hello people!

I have a DataStream, which has events with with a continuing number which signifies their belonging to a production cycle. In essence, this is what the data looks like:

value, production cycle
12.0, 2000
12.3, 2000 one production cylce
12.2, 2000

0.0, 2001
0.4, 2002 another production cycle
1.1, 2002

55.0, 2003
60.0, 2003 another production cycle
70.0, 2003

I have to do some calculations over the events of each production cycle. I want to use Flink's window API for that. This is how I'm doing it right now:

DataStream<String> test = streamExecEnv.readTextFile("C:/Projects/Python/testdata.txt")
    .map(new ImaginePaperDataConverterTest()) // convert data to POJO
    .assignTimestampsAndWatermarks(new ImaginePaperAssigner()) // Assign timestamps for event time
    .keyBy((ImaginePaperData event) -> event.lunum) //<- the production cycle number
    .window(GlobalWindows.create()) // create global window
    .trigger(new LunumTrigger()) // "split" the window with a custom trigger
    .process(new ImaginePaperWindowReportFunction()); // apply a function over the aggregated events

I'm getting a "DataStream" out of a text file, just for testing purposes. The problem is that what I'm doing only aggregates one single event for a production cycle. Why is that? I thought keying the stream by the production cycle number already partitions the stream anyways. The trigger says when the production cycle number is changed, a new global window is started and the events of the current window are aggregated. What am I missing here?
Just to be safe, here is my implementation of the custom trigger:

public class LunumTrigger extends Trigger<ImaginePaperData, GlobalWindow> {

private static final long serialVersionUID = 1L;

public LunumTrigger() {}

private final ValueStateDescriptor<Integer> prevLunum = new ValueStateDescriptor<>("lunum", Integer.class);

@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

ValueState<Integer> lunumState = ctx.getPartitionedState(prevLunum);

if (lunumState.value() == null || !(element.lunum.equals(lunumState.value()))) {
System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> FIRE!");
lunumState.update(element.lunum);
return TriggerResult.FIRE_AND_PURGE;
}

System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> HOLD!");
lunumState.update(element.lunum);
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(prevLunum).clear();
}
}

I'm very grateful for your help.

Regards,

Daniel



--
David Anderson | Training Coordinator
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time