Unable to query/print the incomplete bucket state

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Unable to query/print the incomplete bucket state

Falak Kansal
Hi, greetings

I am applying window operations on a datastream. Then I apply some transformation (it could be anything). Let's say I keep the window size to 1 minute and data is coming in a strictly increasing timestamp and let's say watermark is 1 ms (checkpointing is also enabled). There would be a one window, where data will be constantly coming. Now if I try to query this bucket  (state) using a queryable state, then i don't get the results. Similarly if I print the minMaxTempPerWindow stream, it is printed only when the bucket is finalized. I am not able to retrieve the results until that window is finalized. For all other finalized buckets i am able to query the results. If I keep the window size to 10 minutes that means I won't be able to query the data upto 10 minutes which makes it unfit for real time streaming use cases. I think there must be some way to query the intermediate state of a window. Help would be appreciated. Thank you.


Below is the code, state is what i am querying later using QueryableStateClient.
val env = StreamExecutionEnvironment.getExecutionEnvironment

// checkpoint every 10 seconds
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1L)

// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new ResettableSensorSource)
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner)

val minMaxTempPerWindow = sensorData
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new HighAndLowTempProcessFunction)

val state = minMaxTempPerWindow.keyBy(_.endTs).asQueryableState("highAndLowTemperature")
Thank you
Falak
Reply | Threaded
Open this post in threaded view
|

Re: Unable to query/print the incomplete bucket state

Guowei Ma
Hi, Falak
>>>Now if I try to query this bucket  (state) using a queryable state, then i don't get the results.
AFAIK, Flink does not have a way to let user query the state of the `WiindowOperator`. It needs to expose the window operator's internal implementation, which might be difficult to maintain if the implementation changes.

>>>Similarly if I print the minMaxTempPerWindow stream, it is printed only when the bucket is finalized. 
This is because 'TumblingEventTimeWindows' only sends the result at the end of the window. If you want to get the result "quickly" you could customize the window's trigger[1] yourself.


On Mon, Jan 25, 2021 at 12:47 PM Falak Kansal <[hidden email]> wrote:
Hi, greetings

I am applying window operations on a datastream. Then I apply some transformation (it could be anything). Let's say I keep the window size to 1 minute and data is coming in a strictly increasing timestamp and let's say watermark is 1 ms (checkpointing is also enabled). There would be a one window, where data will be constantly coming. Now if I try to query this bucket  (state) using a queryable state, then i don't get the results. Similarly if I print the minMaxTempPerWindow stream, it is printed only when the bucket is finalized. I am not able to retrieve the results until that window is finalized. For all other finalized buckets i am able to query the results. If I keep the window size to 10 minutes that means I won't be able to query the data upto 10 minutes which makes it unfit for real time streaming use cases. I think there must be some way to query the intermediate state of a window. Help would be appreciated. Thank you.


Below is the code, state is what i am querying later using QueryableStateClient.
val env = StreamExecutionEnvironment.getExecutionEnvironment

// checkpoint every 10 seconds
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1L)

// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new ResettableSensorSource)
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner)

val minMaxTempPerWindow = sensorData
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new HighAndLowTempProcessFunction)

val state = minMaxTempPerWindow.keyBy(_.endTs).asQueryableState("highAndLowTemperature")
Thank you
Falak