multiple processing of streams

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

multiple processing of streams

robert.lancaster

Is it possible to process the same stream in two different ways?  I can’t find anything in the documentation definitively stating this is possible, but nor do I find anything stating it isn’t.  My attempt had some unexpected results, which I’ll explain below:

 

Essentially, I have a stream of data I’m pulling from Kafka.  I want to build aggregate metrics on this data set using both tumbling windows as well as session windows.  So, I do something like the following:

 

DataStream<MyRecordType> baseStream =

                env.addSource(….);            // pulling data from kafka

       .map(…)                         // parse the raw input

                      .assignTimestampsAndWatermarks(…);

 

DataStream <Tuple..<…>> timeWindowedStream =

                baseStream.keyBy(…)

                                      .timeWindow(…)           // tumbling window

                                      .apply(…);                       // aggregation over tumbling window

 

DataStream <Tuple..<…>> sessionWindowedStream =

                baseStream.keyBy(…)

                                      .window(EventTimeSessionWindows.withGap(…))              // session window

                                      .apply(…);                                                                                       // aggregation over session window

 

The issue is that when I view my job in the Flink dashboard, it indicates that each type of windowing is only receiving half of the records.  Is what I’m trying simply unsupported or is there something I’m missing?

 

Thanks!

 

 

 

               

 

 

 



The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.
Reply | Threaded
Open this post in threaded view
|

Re: multiple processing of streams

Fabian Hueske-2
Hi Robert,

it is certainly possible to feed the same DataStream into two (or more) operators.
Both operators should then process the complete input stream.

What you describe is an unintended behavior.
Can you explain how you figure out that both window operators only receive half of the events?

Thanks,
Fabian




2016-10-19 18:28 GMT+02:00 <[hidden email]>:

Is it possible to process the same stream in two different ways?  I can’t find anything in the documentation definitively stating this is possible, but nor do I find anything stating it isn’t.  My attempt had some unexpected results, which I’ll explain below:

 

Essentially, I have a stream of data I’m pulling from Kafka.  I want to build aggregate metrics on this data set using both tumbling windows as well as session windows.  So, I do something like the following:

 

DataStream<MyRecordType> baseStream =

                env.addSource(….);            // pulling data from kafka

       .map(…)                         // parse the raw input

                      .assignTimestampsAndWatermarks(…);

 

DataStream <Tuple..<…>> timeWindowedStream =

                baseStream.keyBy(…)

                                      .timeWindow(…)           // tumbling window

                                      .apply(…);                       // aggregation over tumbling window

 

DataStream <Tuple..<…>> sessionWindowedStream =

                baseStream.keyBy(…)

                                      .window(EventTimeSessionWindows.withGap(…))              // session window

                                      .apply(…);                                                                                       // aggregation over session window

 

The issue is that when I view my job in the Flink dashboard, it indicates that each type of windowing is only receiving half of the records.  Is what I’m trying simply unsupported or is there something I’m missing?

 

Thanks!

 

 

 

               

 

 

 



The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

Reply | Threaded
Open this post in threaded view
|

Re: multiple processing of streams

robert.lancaster

Hi Fabian,

 

Thanks for the response.  It turns out this was a red herring.  I knew how many events I was sending through the process, and the output of each type of window aggregate was coming out to be about half of what I expected.  It turns out, however, that I hadn’t realized that the job was failing prior to completing processing (out of heap), so not all records were processed.  I believe my out of heap issue to be caused by sessions with a very large number of records per key (and/or with no period of inactivity to trigger the end of the session), so I’m looking at a way to customize the EventTimeSessionWindow, or perhaps create a custom EventTrigger, to force a session to close after either X seconds of inactivity or Y seconds of duration (or perhaps after Z events). 

 

 

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, October 21, 2016 at 5:17 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: multiple processing of streams

 

Hi Robert,

it is certainly possible to feed the same DataStream into two (or more) operators.

Both operators should then process the complete input stream.

What you describe is an unintended behavior.
Can you explain how you figure out that both window operators only receive half of the events?

Thanks,

Fabian



 

2016-10-19 18:28 GMT+02:00 <[hidden email]>:

Is it possible to process the same stream in two different ways?  I can’t find anything in the documentation definitively stating this is possible, but nor do I find anything stating it isn’t.  My attempt had some unexpected results, which I’ll explain below:

 

Essentially, I have a stream of data I’m pulling from Kafka.  I want to build aggregate metrics on this data set using both tumbling windows as well as session windows.  So, I do something like the following:

 

DataStream<MyRecordType> baseStream =

                env.addSource(….);            // pulling data from kafka

       .map(…)                         // parse the raw input

                      .assignTimestampsAndWatermarks(…);

 

DataStream <Tuple..<…>> timeWindowedStream =

                baseStream.keyBy(…)

                                      .timeWindow(…)           // tumbling window

                                      .apply(…);                       // aggregation over tumbling window

 

DataStream <Tuple..<…>> sessionWindowedStream =

                baseStream.keyBy(…)

                                      .window(EventTimeSessionWindows.withGap(…))              // session window

                                      .apply(…);                                                                                       // aggregation over session window

 

The issue is that when I view my job in the Flink dashboard, it indicates that each type of windowing is only receiving half of the records.  Is what I’m trying simply unsupported or is there something I’m missing?

 

Thanks!

 

 

 

               

 

 

 

 


The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

 



The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.
Reply | Threaded
Open this post in threaded view
|

Re: multiple processing of streams

Fabian Hueske-2
Hi Robert,

thanks for the update.
Regarding the SessionWindow. If you can implement your window logic as ReduceFunction + WindowFunction (see incremental window aggregation [1]), your window state will be independent of the number of elements in the window. If that is not possible, you might have to go with the custom trigger approach you described.

Best, Fabian

2016-10-24 13:59 GMT+02:00 <[hidden email]>:

Hi Fabian,

 

Thanks for the response.  It turns out this was a red herring.  I knew how many events I was sending through the process, and the output of each type of window aggregate was coming out to be about half of what I expected.  It turns out, however, that I hadn’t realized that the job was failing prior to completing processing (out of heap), so not all records were processed.  I believe my out of heap issue to be caused by sessions with a very large number of records per key (and/or with no period of inactivity to trigger the end of the session), so I’m looking at a way to customize the EventTimeSessionWindow, or perhaps create a custom EventTrigger, to force a session to close after either X seconds of inactivity or Y seconds of duration (or perhaps after Z events). 

 

 

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, October 21, 2016 at 5:17 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: multiple processing of streams

 

Hi Robert,

it is certainly possible to feed the same DataStream into two (or more) operators.

Both operators should then process the complete input stream.

What you describe is an unintended behavior.
Can you explain how you figure out that both window operators only receive half of the events?

Thanks,

Fabian



 

2016-10-19 18:28 GMT+02:00 <[hidden email]>:

Is it possible to process the same stream in two different ways?  I can’t find anything in the documentation definitively stating this is possible, but nor do I find anything stating it isn’t.  My attempt had some unexpected results, which I’ll explain below:

 

Essentially, I have a stream of data I’m pulling from Kafka.  I want to build aggregate metrics on this data set using both tumbling windows as well as session windows.  So, I do something like the following:

 

DataStream<MyRecordType> baseStream =

                env.addSource(….);            // pulling data from kafka

       .map(…)                         // parse the raw input

                      .assignTimestampsAndWatermarks(…);

 

DataStream <Tuple..<…>> timeWindowedStream =

                baseStream.keyBy(…)

                                      .timeWindow(…)           // tumbling window

                                      .apply(…);                       // aggregation over tumbling window

 

DataStream <Tuple..<…>> sessionWindowedStream =

                baseStream.keyBy(…)

                                      .window(EventTimeSessionWindows.withGap(…))              // session window

                                      .apply(…);                                                                                       // aggregation over session window

 

The issue is that when I view my job in the Flink dashboard, it indicates that each type of windowing is only receiving half of the records.  Is what I’m trying simply unsupported or is there something I’m missing?

 

Thanks!

 

 

 

               

 

 

 

 


The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

 



The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

Reply | Threaded
Open this post in threaded view
|

Re: multiple processing of streams

robert.lancaster

Hi Fabian,

 

Ah, that is good stuff, thanks.  I’ll have to evaluate, but I believe that everything I’m calculating can be done this way, though it looks like FoldFunction + WindowFunction is better suited for my use case.

 

I may still need a custom trigger as well, though.  Some sessions may never end, or may be too long lasting for the information to be timely by the time the window closes.  So I might still need to prematurely end those sessions in order to get data about them (or rely on information gathered from my tumbling windows).  But, I’ll start with Fold + Window first, which should get rid of my heap issues.

 

Thanks!

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Monday, October 24, 2016 at 2:27 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: multiple processing of streams

 

Hi Robert,

thanks for the update.

Regarding the SessionWindow. If you can implement your window logic as ReduceFunction + WindowFunction (see incremental window aggregation [1]), your window state will be independent of the number of elements in the window. If that is not possible, you might have to go with the custom trigger approach you described.

Best, Fabian

 

2016-10-24 13:59 GMT+02:00 <[hidden email]>:

Hi Fabian,

 

Thanks for the response.  It turns out this was a red herring.  I knew how many events I was sending through the process, and the output of each type of window aggregate was coming out to be about half of what I expected.  It turns out, however, that I hadn’t realized that the job was failing prior to completing processing (out of heap), so not all records were processed.  I believe my out of heap issue to be caused by sessions with a very large number of records per key (and/or with no period of inactivity to trigger the end of the session), so I’m looking at a way to customize the EventTimeSessionWindow, or perhaps create a custom EventTrigger, to force a session to close after either X seconds of inactivity or Y seconds of duration (or perhaps after Z events). 

 

 

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, October 21, 2016 at 5:17 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: multiple processing of streams

 

Hi Robert,

it is certainly possible to feed the same DataStream into two (or more) operators.

Both operators should then process the complete input stream.

What you describe is an unintended behavior.
Can you explain how you figure out that both window operators only receive half of the events?

Thanks,

Fabian

 

 

2016-10-19 18:28 GMT+02:00 <[hidden email]>:

Is it possible to process the same stream in two different ways?  I can’t find anything in the documentation definitively stating this is possible, but nor do I find anything stating it isn’t.  My attempt had some unexpected results, which I’ll explain below:

 

Essentially, I have a stream of data I’m pulling from Kafka.  I want to build aggregate metrics on this data set using both tumbling windows as well as session windows.  So, I do something like the following:

 

DataStream<MyRecordType> baseStream =

                env.addSource(….);            // pulling data from kafka

       .map(…)                         // parse the raw input

                      .assignTimestampsAndWatermarks(…);

 

DataStream <Tuple..<…>> timeWindowedStream =

                baseStream.keyBy(…)

                                      .timeWindow(…)           // tumbling window

                                      .apply(…);                       // aggregation over tumbling window

 

DataStream <Tuple..<…>> sessionWindowedStream =

                baseStream.keyBy(…)

                                      .window(EventTimeSessionWindows.withGap(…))              // session window

                                      .apply(…);                                                                                       // aggregation over session window

 

The issue is that when I view my job in the Flink dashboard, it indicates that each type of windowing is only receiving half of the records.  Is what I’m trying simply unsupported or is there something I’m missing?

 

Thanks!

 

 

 

               

 

 

 

 


The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

 

 


The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

 



The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.
Reply | Threaded
Open this post in threaded view
|

Re: multiple processing of streams

Fabian Hueske-2
Hi Robert,

Unfortunately, FoldFunctions can only be used for eager aggregation in Tumbling and SlidingWindows.
For SessionWindows, only ReduceFunction can be used. The problem is that two SessionWindows might be combined in case a late event is received that "connects" them. In that case, the window function would need to combine two intermediate results of the same type. This is not possible with FoldFunction where input type and result type may be different.

Best, Fabian

2016-10-24 15:54 GMT+02:00 <[hidden email]>:

Hi Fabian,

 

Ah, that is good stuff, thanks.  I’ll have to evaluate, but I believe that everything I’m calculating can be done this way, though it looks like FoldFunction + WindowFunction is better suited for my use case.

 

I may still need a custom trigger as well, though.  Some sessions may never end, or may be too long lasting for the information to be timely by the time the window closes.  So I might still need to prematurely end those sessions in order to get data about them (or rely on information gathered from my tumbling windows).  But, I’ll start with Fold + Window first, which should get rid of my heap issues.

 

Thanks!

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Monday, October 24, 2016 at 2:27 PM


To: "[hidden email]" <[hidden email]>
Subject: Re: multiple processing of streams

 

Hi Robert,

thanks for the update.

Regarding the SessionWindow. If you can implement your window logic as ReduceFunction + WindowFunction (see incremental window aggregation [1]), your window state will be independent of the number of elements in the window. If that is not possible, you might have to go with the custom trigger approach you described.

Best, Fabian

 

2016-10-24 13:59 GMT+02:00 <[hidden email]>:

Hi Fabian,

 

Thanks for the response.  It turns out this was a red herring.  I knew how many events I was sending through the process, and the output of each type of window aggregate was coming out to be about half of what I expected.  It turns out, however, that I hadn’t realized that the job was failing prior to completing processing (out of heap), so not all records were processed.  I believe my out of heap issue to be caused by sessions with a very large number of records per key (and/or with no period of inactivity to trigger the end of the session), so I’m looking at a way to customize the EventTimeSessionWindow, or perhaps create a custom EventTrigger, to force a session to close after either X seconds of inactivity or Y seconds of duration (or perhaps after Z events). 

 

 

 

From: Fabian Hueske <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Friday, October 21, 2016 at 5:17 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: multiple processing of streams

 

Hi Robert,

it is certainly possible to feed the same DataStream into two (or more) operators.

Both operators should then process the complete input stream.

What you describe is an unintended behavior.
Can you explain how you figure out that both window operators only receive half of the events?

Thanks,

Fabian

 

 

2016-10-19 18:28 GMT+02:00 <[hidden email]>:

Is it possible to process the same stream in two different ways?  I can’t find anything in the documentation definitively stating this is possible, but nor do I find anything stating it isn’t.  My attempt had some unexpected results, which I’ll explain below:

 

Essentially, I have a stream of data I’m pulling from Kafka.  I want to build aggregate metrics on this data set using both tumbling windows as well as session windows.  So, I do something like the following:

 

DataStream<MyRecordType> baseStream =

                env.addSource(….);            // pulling data from kafka

       .map(…)                         // parse the raw input

                      .assignTimestampsAndWatermarks(…);

 

DataStream <Tuple..<…>> timeWindowedStream =

                baseStream.keyBy(…)

                                      .timeWindow(…)           // tumbling window

                                      .apply(…);                       // aggregation over tumbling window

 

DataStream <Tuple..<…>> sessionWindowedStream =

                baseStream.keyBy(…)

                                      .window(EventTimeSessionWindows.withGap(…))              // session window

                                      .apply(…);                                                                                       // aggregation over session window

 

The issue is that when I view my job in the Flink dashboard, it indicates that each type of windowing is only receiving half of the records.  Is what I’m trying simply unsupported or is there something I’m missing?

 

Thanks!

 

 

 

               

 

 

 

 


The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

 

 


The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

 



The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.