Hi All,
I'm trying to understand how to create a sample trigger. Let's say that I have a stream like this one:
Event: "YELLOW, BLUE, WHITE, RED, GREEN, RED, GREEN, RED,
YELLOW, YELLOW"
Event: "YELLOW, BLUE, BLACK, RED,BLUE, RED, PINK, RED,
YELLOW, YELLOW"
My stream is then mapped, and I produce an enriched Stream with an additional Boolean variable that define when is the moment to trigger the window. (I'm using this twostep approach to also calculate a value for a dynamic session window parameter, but on this
example to simplify I cut it out, it would be more precise to say that I should use a Tuple3). In this specific example if two consecutive messages with 2 "yellow" are seen MyFunctionToAddTheBoolean() will emit a "true".
DataStream<Tuple2<Event, Boolean>> Enriched = stream
.keyBy(...)
.map(new MyFunctionToAddTheBoolean());
Enriched:Tuple2<stream_A, Boolean>
With this new stream called "Enriched", I'm going to move on to the second step where I would like to use the parameter to trigger the window processing.
DataStream<String> WinStream = enriched
.keyBy(new MyKeySelector())
.window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()))
.trigger(MySuperTriggerFunction()_?)
.process(new MyProcessWindowFunction());
My questions are:
1)Would be possible to have an example on how to write a function (MySuperTriggerFunction().)that can do this?
2)How the DynamicSessionWindows() and MySuperTriggerFunction() can work together, when on the
DynamicSessionWindows() I'm giving to Flink an indication to process my data if the gap is greater than '1000' millis, but on the other hand I'm also giving a trigger(). Would the application be able to follow both and run a processWindowFunction if
either of the two are respected, or do I have to decide which one of the two should be used? or prioritise?
I have been reading:
Where i learn that: A
Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each
WindowAssigner comes with a default
Trigger . If the default trigger does not fit your needs, you can specify a custom trigger using
trigger(...) .
And:
But still... Some help would be really appreciated!
Thanks!
Simone
|
Free forum by Nabble | Edit this page |