How to print the aggregated state everytime it is updated?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to print the aggregated state everytime it is updated?

kant kodali
Hi All,

I have a custom aggregated state that is represent by Set<Long> and I have a stream of values coming in from Kafka where I inspect, compute the custom aggregation and store it in Set<Long>. Now, I am trying to figureout how do I print the updated value everytime this state is updated? 

Imagine I have a Datastream<Set<Long>>

I tried few things already but keep running into the following exception. Not sure why? Do I need to call assignTimestampsAndWatermark? I thought watermarks are not mandatory in Flink especially when I want to keep this aggregated state forever. any simple code sample on how to print the streaming aggregated state represented by Datastream<Set<Long>> will be great! You can imagine my Set<Long> has a toString() method that takes cares of printing..and I just want to see those values in stdout.

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

Reply | Threaded
Open this post in threaded view
|

Re: How to print the aggregated state everytime it is updated?

Congxian Qiu
Hi

From the description, you use window operator, and set to event time. then you should call `DataStream.assignTimestampsAndWatermarks` to set the timestamp and watermark.
Window is triggered when the watermark exceed the window end time

Best,
Congxian


kant kodali <[hidden email]> 于2020年3月4日周三 上午5:11写道:
Hi All,

I have a custom aggregated state that is represent by Set<Long> and I have a stream of values coming in from Kafka where I inspect, compute the custom aggregation and store it in Set<Long>. Now, I am trying to figureout how do I print the updated value everytime this state is updated? 

Imagine I have a Datastream<Set<Long>>

I tried few things already but keep running into the following exception. Not sure why? Do I need to call assignTimestampsAndWatermark? I thought watermarks are not mandatory in Flink especially when I want to keep this aggregated state forever. any simple code sample on how to print the streaming aggregated state represented by Datastream<Set<Long>> will be great! You can imagine my Set<Long> has a toString() method that takes cares of printing..and I just want to see those values in stdout.

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

Reply | Threaded
Open this post in threaded view
|

Re: How to print the aggregated state everytime it is updated?

kant kodali
Hi,

Thanks for this. so how can I emulate an infinite window while outputting every second? simply put, I want to store the state forever (say years) and since rocksdb is my state backend I am assuming I can state the state until I run out of disk. However I want to see all the updates to the states every second. sounds to me I need to have a window of one second, compute for that window and pass it on to next window or is there some other way? 

Thanks

On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu <[hidden email]> wrote:
Hi

From the description, you use window operator, and set to event time. then you should call `DataStream.assignTimestampsAndWatermarks` to set the timestamp and watermark.
Window is triggered when the watermark exceed the window end time

Best,
Congxian


kant kodali <[hidden email]> 于2020年3月4日周三 上午5:11写道:
Hi All,

I have a custom aggregated state that is represent by Set<Long> and I have a stream of values coming in from Kafka where I inspect, compute the custom aggregation and store it in Set<Long>. Now, I am trying to figureout how do I print the updated value everytime this state is updated? 

Imagine I have a Datastream<Set<Long>>

I tried few things already but keep running into the following exception. Not sure why? Do I need to call assignTimestampsAndWatermark? I thought watermarks are not mandatory in Flink especially when I want to keep this aggregated state forever. any simple code sample on how to print the streaming aggregated state represented by Datastream<Set<Long>> will be great! You can imagine my Set<Long> has a toString() method that takes cares of printing..and I just want to see those values in stdout.

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

Reply | Threaded
Open this post in threaded view
|

Re: How to print the aggregated state everytime it is updated?

rmetzger0
Hey,

I don't think you need to use a window operator for this use case. A reduce (or fold) operation should be enough: https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/


On Fri, Mar 6, 2020 at 11:50 AM kant kodali <[hidden email]> wrote:
Hi,

Thanks for this. so how can I emulate an infinite window while outputting every second? simply put, I want to store the state forever (say years) and since rocksdb is my state backend I am assuming I can state the state until I run out of disk. However I want to see all the updates to the states every second. sounds to me I need to have a window of one second, compute for that window and pass it on to next window or is there some other way? 

Thanks

On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu <[hidden email]> wrote:
Hi

From the description, you use window operator, and set to event time. then you should call `DataStream.assignTimestampsAndWatermarks` to set the timestamp and watermark.
Window is triggered when the watermark exceed the window end time

Best,
Congxian


kant kodali <[hidden email]> 于2020年3月4日周三 上午5:11写道:
Hi All,

I have a custom aggregated state that is represent by Set<Long> and I have a stream of values coming in from Kafka where I inspect, compute the custom aggregation and store it in Set<Long>. Now, I am trying to figureout how do I print the updated value everytime this state is updated? 

Imagine I have a Datastream<Set<Long>>

I tried few things already but keep running into the following exception. Not sure why? Do I need to call assignTimestampsAndWatermark? I thought watermarks are not mandatory in Flink especially when I want to keep this aggregated state forever. any simple code sample on how to print the streaming aggregated state represented by Datastream<Set<Long>> will be great! You can imagine my Set<Long> has a toString() method that takes cares of printing..and I just want to see those values in stdout.

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

Reply | Threaded
Open this post in threaded view
|

Re: How to print the aggregated state everytime it is updated?

Arvid Heise-3
Hi Kant,

if you only want to output every second, you probably want to use a ProcessFunction with timers [1].

Basically, this function holds the states and manages the updates to it. The updates should also be stored in a local/non-state variable changes. Whenever the timer triggers, you would output changes (possibly to a side output) and reset it.


On Fri, Mar 6, 2020 at 4:39 PM Robert Metzger <[hidden email]> wrote:
Hey,

I don't think you need to use a window operator for this use case. A reduce (or fold) operation should be enough: https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/


On Fri, Mar 6, 2020 at 11:50 AM kant kodali <[hidden email]> wrote:
Hi,

Thanks for this. so how can I emulate an infinite window while outputting every second? simply put, I want to store the state forever (say years) and since rocksdb is my state backend I am assuming I can state the state until I run out of disk. However I want to see all the updates to the states every second. sounds to me I need to have a window of one second, compute for that window and pass it on to next window or is there some other way? 

Thanks

On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu <[hidden email]> wrote:
Hi

From the description, you use window operator, and set to event time. then you should call `DataStream.assignTimestampsAndWatermarks` to set the timestamp and watermark.
Window is triggered when the watermark exceed the window end time

Best,
Congxian


kant kodali <[hidden email]> 于2020年3月4日周三 上午5:11写道:
Hi All,

I have a custom aggregated state that is represent by Set<Long> and I have a stream of values coming in from Kafka where I inspect, compute the custom aggregation and store it in Set<Long>. Now, I am trying to figureout how do I print the updated value everytime this state is updated? 

Imagine I have a Datastream<Set<Long>>

I tried few things already but keep running into the following exception. Not sure why? Do I need to call assignTimestampsAndWatermark? I thought watermarks are not mandatory in Flink especially when I want to keep this aggregated state forever. any simple code sample on how to print the streaming aggregated state represented by Datastream<Set<Long>> will be great! You can imagine my Set<Long> has a toString() method that takes cares of printing..and I just want to see those values in stdout.

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?