Setting source vs sink vs window parallelism with data increase

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Setting source vs sink vs window parallelism with data increase

Padarn Wilson-2
Hi all,

I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like:

Data: (key1, count, time)

Source -> Map(x -> (x, newKeyList(x.key1))
            -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
            -> Keyby(_.key1).TublingWindow().apply..
            -> Sink

In the Map -> Flatmap, what is happening is that each key is mapping to a set of keys, and then this is set as the new key. This effectively increase the size of the stream by 16x

What I am trying to figure out is how to set the parallelism of my operators. I see in some comments that people suggest your source, sink and aggregation should have different parallelism, but I'm not clear on exactly why, or what this means for CPU utilization. 

Also, it isn't clear to me the best way to handle this increase in data within the stream itself.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Setting source vs sink vs window parallelism with data increase

Padarn Wilson-2
Hi all again - following up on this I think I've identified my problem as being something else, but would appreciate if anyone can offer advice.

After running my stream from sometime, I see that my garbage collector for old generation starts to take a very long time:
Screen Shot 2019-03-02 at 3.01.57 PM.png
here the purple line is young generation time, this is ever increasing, but grows slowly, while the blue is old generation.
This in itself is not a problem, but as soon as the next checkpoint is triggered after this happens you see the following:
Screen Shot 2019-03-02 at 3.02.48 PM.png
It looks like the checkpoint hits a cap, but this is only because the checkpoints start to timeout and fail (these are the alignment time per operator)

I do notice that my state is growing quite larger over time, but I don't have a good understanding of what would cause this to happen with the JVM old generation metric, which appears to be the leading metric before a problem is noticed. Other metrics such as network buffers also show that at the checkpoint time things start to go haywire and the situation never recovers.

Thanks

On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <[hidden email]> wrote:
Hi all,

I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like:

Data: (key1, count, time)

Source -> Map(x -> (x, newKeyList(x.key1))
            -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
            -> Keyby(_.key1).TublingWindow().apply..
            -> Sink

In the Map -> Flatmap, what is happening is that each key is mapping to a set of keys, and then this is set as the new key. This effectively increase the size of the stream by 16x

What I am trying to figure out is how to set the parallelism of my operators. I see in some comments that people suggest your source, sink and aggregation should have different parallelism, but I'm not clear on exactly why, or what this means for CPU utilization. 

Also, it isn't clear to me the best way to handle this increase in data within the stream itself.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Setting source vs sink vs window parallelism with data increase

Piotr Nowojski-3
Hi,

What Flink version are you using?

Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 

However you might want to first identify what’s causing long GC times. If there are long GC pause, this should be the first thing to fix.

Piotrek

On 2 Mar 2019, at 08:19, Padarn Wilson <[hidden email]> wrote:

Hi all again - following up on this I think I've identified my problem as being something else, but would appreciate if anyone can offer advice.

After running my stream from sometime, I see that my garbage collector for old generation starts to take a very long time:
<Screen Shot 2019-03-02 at 3.01.57 PM.png>
here the purple line is young generation time, this is ever increasing, but grows slowly, while the blue is old generation.
This in itself is not a problem, but as soon as the next checkpoint is triggered after this happens you see the following:
<Screen Shot 2019-03-02 at 3.02.48 PM.png>
It looks like the checkpoint hits a cap, but this is only because the checkpoints start to timeout and fail (these are the alignment time per operator)

I do notice that my state is growing quite larger over time, but I don't have a good understanding of what would cause this to happen with the JVM old generation metric, which appears to be the leading metric before a problem is noticed. Other metrics such as network buffers also show that at the checkpoint time things start to go haywire and the situation never recovers.

Thanks

On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <[hidden email]> wrote:
Hi all,

I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like:

Data: (key1, count, time)

Source -> Map(x -> (x, newKeyList(x.key1))
            -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
            -> Keyby(_.key1).TublingWindow().apply..
            -> Sink

In the Map -> Flatmap, what is happening is that each key is mapping to a set of keys, and then this is set as the new key. This effectively increase the size of the stream by 16x

What I am trying to figure out is how to set the parallelism of my operators. I see in some comments that people suggest your source, sink and aggregation should have different parallelism, but I'm not clear on exactly why, or what this means for CPU utilization. 

Also, it isn't clear to me the best way to handle this increase in data within the stream itself.

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Setting source vs sink vs window parallelism with data increase

Piotr Nowojski-3
Re-adding user mailing list.


Hi,

If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s Mission Control) can lead you to the solution. Once you confirm that it’s a GC issue, there are numerous resources online how to analyse the cause of the problem. For that, it is difficult to use CPU profiling/Flink Metrics, since GC issues caused by one thread, can cause performance bottlenecks in other unrelated places.

If that’s not a GC issue, you can use Flink metrics (like number of buffered input/output data) to find Task that’s causing a bottleneck. Then you can use CPU profiler to analyse why is that happening.

Piotrek

On 6 Mar 2019, at 02:52, Padarn Wilson <[hidden email]> wrote:

Hi Piotr,

Thanks for your feedback. Makes sense about the checkpoint barriers - this definitely could be the cause of a problem.

I would advice profiling your job to find out what’s going on.

Agreed. Outside of inspecting the Flink metrics, do you have suggestions for tools with which to do this?

The main thing I'm trying to pin down is:
1) Is it the downstream processing from the expansion of records that causes a problem, or 
2) Is is the shuffle of all the records after the expansion which is taking a large time - if so, is there anything I can do to mitigate this other than trying to ensure less shuffle.

Thanks,
Padarn


On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 

The problem is that when Flink starts checkpoint and inject checkpoint barriers, those checkpoint barriers travel through the Job Graph. The quicker they can do that the better. How fast does it take depends on the amount of buffered data before checkpoint barriers (currently all of such records must be processed before checkpoint barrier is passed down stream). The more buffered records and the more time it takes to process those records, the longer the checkpoint take time. Obviously if one stage in the job is multiplying the amount of records, it can in a way multiply the amount of “buffered work” that needs to be processed before checkpoint barriers pass through.

However it might not be the case for you. To analyse what’s going on you would need to look at various Flink metrics, like checkpoint times, back pressured tasks, state of the output/input buffers of the tasks, etc. However #2, those are secondary issues. First of all you should try to pin point the cause of long GC pauses. If it comes from your code, you should fix this first. If that either isn’t the issue or doesn’t solve it, generally speaking I would advice profiling your job to find out what’s going on.

Piotrek

On 5 Mar 2019, at 02:00, Padarn Wilson <[hidden email]> wrote:

Hi Piotr,

Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).

Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 

Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 

However you might want to first identify what’s causing long GC times.

My current plan is to try and enable GC logs and see if I can get something meaningful from them. 

Thanks a lot,
Padarn 


On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

What Flink version are you using?

Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 

However you might want to first identify what’s causing long GC times. If there are long GC pause, this should be the first thing to fix.

Piotrek

On 2 Mar 2019, at 08:19, Padarn Wilson <[hidden email]> wrote:

Hi all again - following up on this I think I've identified my problem as being something else, but would appreciate if anyone can offer advice.

After running my stream from sometime, I see that my garbage collector for old generation starts to take a very long time:
<Screen Shot 2019-03-02 at 3.01.57 PM.png>
here the purple line is young generation time, this is ever increasing, but grows slowly, while the blue is old generation.
This in itself is not a problem, but as soon as the next checkpoint is triggered after this happens you see the following:
<Screen Shot 2019-03-02 at 3.02.48 PM.png>
It looks like the checkpoint hits a cap, but this is only because the checkpoints start to timeout and fail (these are the alignment time per operator)

I do notice that my state is growing quite larger over time, but I don't have a good understanding of what would cause this to happen with the JVM old generation metric, which appears to be the leading metric before a problem is noticed. Other metrics such as network buffers also show that at the checkpoint time things start to go haywire and the situation never recovers.

Thanks

On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <[hidden email]> wrote:
Hi all,

I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like:

Data: (key1, count, time)

Source -> Map(x -> (x, newKeyList(x.key1))
            -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
            -> Keyby(_.key1).TublingWindow().apply..
            -> Sink

In the Map -> Flatmap, what is happening is that each key is mapping to a set of keys, and then this is set as the new key. This effectively increase the size of the stream by 16x

What I am trying to figure out is how to set the parallelism of my operators. I see in some comments that people suggest your source, sink and aggregation should have different parallelism, but I'm not clear on exactly why, or what this means for CPU utilization. 

Also, it isn't clear to me the best way to handle this increase in data within the stream itself.

Thanks



Reply | Threaded
Open this post in threaded view
|

Re: Setting source vs sink vs window parallelism with data increase

Padarn Wilson-2
Thanks a lot for your suggestion. I’ll dig into it and update for the mailing list if I find anything useful.

Padarn

On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <[hidden email]> wrote:
Re-adding user mailing list.


Hi,

If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s Mission Control) can lead you to the solution. Once you confirm that it’s a GC issue, there are numerous resources online how to analyse the cause of the problem. For that, it is difficult to use CPU profiling/Flink Metrics, since GC issues caused by one thread, can cause performance bottlenecks in other unrelated places.

If that’s not a GC issue, you can use Flink metrics (like number of buffered input/output data) to find Task that’s causing a bottleneck. Then you can use CPU profiler to analyse why is that happening.

Piotrek

On 6 Mar 2019, at 02:52, Padarn Wilson <[hidden email]> wrote:

Hi Piotr,

Thanks for your feedback. Makes sense about the checkpoint barriers - this definitely could be the cause of a problem.

I would advice profiling your job to find out what’s going on.

Agreed. Outside of inspecting the Flink metrics, do you have suggestions for tools with which to do this?

The main thing I'm trying to pin down is:
1) Is it the downstream processing from the expansion of records that causes a problem, or 
2) Is is the shuffle of all the records after the expansion which is taking a large time - if so, is there anything I can do to mitigate this other than trying to ensure less shuffle.

Thanks,
Padarn


On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 

The problem is that when Flink starts checkpoint and inject checkpoint barriers, those checkpoint barriers travel through the Job Graph. The quicker they can do that the better. How fast does it take depends on the amount of buffered data before checkpoint barriers (currently all of such records must be processed before checkpoint barrier is passed down stream). The more buffered records and the more time it takes to process those records, the longer the checkpoint take time. Obviously if one stage in the job is multiplying the amount of records, it can in a way multiply the amount of “buffered work” that needs to be processed before checkpoint barriers pass through.

However it might not be the case for you. To analyse what’s going on you would need to look at various Flink metrics, like checkpoint times, back pressured tasks, state of the output/input buffers of the tasks, etc. However #2, those are secondary issues. First of all you should try to pin point the cause of long GC pauses. If it comes from your code, you should fix this first. If that either isn’t the issue or doesn’t solve it, generally speaking I would advice profiling your job to find out what’s going on.

Piotrek

On 5 Mar 2019, at 02:00, Padarn Wilson <[hidden email]> wrote:

Hi Piotr,

Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).

Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 

Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 

However you might want to first identify what’s causing long GC times.

My current plan is to try and enable GC logs and see if I can get something meaningful from them. 

Thanks a lot,
Padarn 


On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

What Flink version are you using?

Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 

However you might want to first identify what’s causing long GC times. If there are long GC pause, this should be the first thing to fix.

Piotrek

On 2 Mar 2019, at 08:19, Padarn Wilson <[hidden email]> wrote:

Hi all again - following up on this I think I've identified my problem as being something else, but would appreciate if anyone can offer advice.

After running my stream from sometime, I see that my garbage collector for old generation starts to take a very long time:
<Screen Shot 2019-03-02 at 3.01.57 PM.png>
here the purple line is young generation time, this is ever increasing, but grows slowly, while the blue is old generation.
This in itself is not a problem, but as soon as the next checkpoint is triggered after this happens you see the following:
<Screen Shot 2019-03-02 at 3.02.48 PM.png>
It looks like the checkpoint hits a cap, but this is only because the checkpoints start to timeout and fail (these are the alignment time per operator)

I do notice that my state is growing quite larger over time, but I don't have a good understanding of what would cause this to happen with the JVM old generation metric, which appears to be the leading metric before a problem is noticed. Other metrics such as network buffers also show that at the checkpoint time things start to go haywire and the situation never recovers.

Thanks

On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <[hidden email]> wrote:
Hi all,

I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like:

Data: (key1, count, time)

Source -> Map(x -> (x, newKeyList(x.key1))
            -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
            -> Keyby(_.key1).TublingWindow().apply..
            -> Sink

In the Map -> Flatmap, what is happening is that each key is mapping to a set of keys, and then this is set as the new key. This effectively increase the size of the stream by 16x

What I am trying to figure out is how to set the parallelism of my operators. I see in some comments that people suggest your source, sink and aggregation should have different parallelism, but I'm not clear on exactly why, or what this means for CPU utilization. 

Also, it isn't clear to me the best way to handle this increase in data within the stream itself.

Thanks



Reply | Threaded
Open this post in threaded view
|

Re: Setting source vs sink vs window parallelism with data increase

Padarn Wilson-2
Well.. it turned out I was registering millions of timers by accident, which was why garbage collection was blowing up. Oops. Thanks for your help again.

On Wed, Mar 6, 2019 at 9:44 PM Padarn Wilson <[hidden email]> wrote:
Thanks a lot for your suggestion. I’ll dig into it and update for the mailing list if I find anything useful.

Padarn

On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <[hidden email]> wrote:
Re-adding user mailing list.


Hi,

If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s Mission Control) can lead you to the solution. Once you confirm that it’s a GC issue, there are numerous resources online how to analyse the cause of the problem. For that, it is difficult to use CPU profiling/Flink Metrics, since GC issues caused by one thread, can cause performance bottlenecks in other unrelated places.

If that’s not a GC issue, you can use Flink metrics (like number of buffered input/output data) to find Task that’s causing a bottleneck. Then you can use CPU profiler to analyse why is that happening.

Piotrek

On 6 Mar 2019, at 02:52, Padarn Wilson <[hidden email]> wrote:

Hi Piotr,

Thanks for your feedback. Makes sense about the checkpoint barriers - this definitely could be the cause of a problem.

I would advice profiling your job to find out what’s going on.

Agreed. Outside of inspecting the Flink metrics, do you have suggestions for tools with which to do this?

The main thing I'm trying to pin down is:
1) Is it the downstream processing from the expansion of records that causes a problem, or 
2) Is is the shuffle of all the records after the expansion which is taking a large time - if so, is there anything I can do to mitigate this other than trying to ensure less shuffle.

Thanks,
Padarn


On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 

The problem is that when Flink starts checkpoint and inject checkpoint barriers, those checkpoint barriers travel through the Job Graph. The quicker they can do that the better. How fast does it take depends on the amount of buffered data before checkpoint barriers (currently all of such records must be processed before checkpoint barrier is passed down stream). The more buffered records and the more time it takes to process those records, the longer the checkpoint take time. Obviously if one stage in the job is multiplying the amount of records, it can in a way multiply the amount of “buffered work” that needs to be processed before checkpoint barriers pass through.

However it might not be the case for you. To analyse what’s going on you would need to look at various Flink metrics, like checkpoint times, back pressured tasks, state of the output/input buffers of the tasks, etc. However #2, those are secondary issues. First of all you should try to pin point the cause of long GC pauses. If it comes from your code, you should fix this first. If that either isn’t the issue or doesn’t solve it, generally speaking I would advice profiling your job to find out what’s going on.

Piotrek

On 5 Mar 2019, at 02:00, Padarn Wilson <[hidden email]> wrote:

Hi Piotr,

Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).

Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 

Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 

However you might want to first identify what’s causing long GC times.

My current plan is to try and enable GC logs and see if I can get something meaningful from them. 

Thanks a lot,
Padarn 


On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

What Flink version are you using?

Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 

However you might want to first identify what’s causing long GC times. If there are long GC pause, this should be the first thing to fix.

Piotrek

On 2 Mar 2019, at 08:19, Padarn Wilson <[hidden email]> wrote:

Hi all again - following up on this I think I've identified my problem as being something else, but would appreciate if anyone can offer advice.

After running my stream from sometime, I see that my garbage collector for old generation starts to take a very long time:
<Screen Shot 2019-03-02 at 3.01.57 PM.png>
here the purple line is young generation time, this is ever increasing, but grows slowly, while the blue is old generation.
This in itself is not a problem, but as soon as the next checkpoint is triggered after this happens you see the following:
<Screen Shot 2019-03-02 at 3.02.48 PM.png>
It looks like the checkpoint hits a cap, but this is only because the checkpoints start to timeout and fail (these are the alignment time per operator)

I do notice that my state is growing quite larger over time, but I don't have a good understanding of what would cause this to happen with the JVM old generation metric, which appears to be the leading metric before a problem is noticed. Other metrics such as network buffers also show that at the checkpoint time things start to go haywire and the situation never recovers.

Thanks

On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <[hidden email]> wrote:
Hi all,

I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like:

Data: (key1, count, time)

Source -> Map(x -> (x, newKeyList(x.key1))
            -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
            -> Keyby(_.key1).TublingWindow().apply..
            -> Sink

In the Map -> Flatmap, what is happening is that each key is mapping to a set of keys, and then this is set as the new key. This effectively increase the size of the stream by 16x

What I am trying to figure out is how to set the parallelism of my operators. I see in some comments that people suggest your source, sink and aggregation should have different parallelism, but I'm not clear on exactly why, or what this means for CPU utilization. 

Also, it isn't clear to me the best way to handle this increase in data within the stream itself.

Thanks



Reply | Threaded
Open this post in threaded view
|

Re: Setting source vs sink vs window parallelism with data increase

Piotr Nowojski-3
No problem and it’s good to hear that you managed to solve the problem.

Piotrek 

On 23 Mar 2019, at 12:49, Padarn Wilson <[hidden email]> wrote:

Well.. it turned out I was registering millions of timers by accident, which was why garbage collection was blowing up. Oops. Thanks for your help again.

On Wed, Mar 6, 2019 at 9:44 PM Padarn Wilson <[hidden email]> wrote:
Thanks a lot for your suggestion. I’ll dig into it and update for the mailing list if I find anything useful.

Padarn

On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <[hidden email]> wrote:
Re-adding user mailing list.


Hi,

If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s Mission Control) can lead you to the solution. Once you confirm that it’s a GC issue, there are numerous resources online how to analyse the cause of the problem. For that, it is difficult to use CPU profiling/Flink Metrics, since GC issues caused by one thread, can cause performance bottlenecks in other unrelated places.

If that’s not a GC issue, you can use Flink metrics (like number of buffered input/output data) to find Task that’s causing a bottleneck. Then you can use CPU profiler to analyse why is that happening.

Piotrek

On 6 Mar 2019, at 02:52, Padarn Wilson <[hidden email]> wrote:

Hi Piotr,

Thanks for your feedback. Makes sense about the checkpoint barriers - this definitely could be the cause of a problem.

I would advice profiling your job to find out what’s going on.

Agreed. Outside of inspecting the Flink metrics, do you have suggestions for tools with which to do this?

The main thing I'm trying to pin down is:
1) Is it the downstream processing from the expansion of records that causes a problem, or 
2) Is is the shuffle of all the records after the expansion which is taking a large time - if so, is there anything I can do to mitigate this other than trying to ensure less shuffle.

Thanks,
Padarn


On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 

The problem is that when Flink starts checkpoint and inject checkpoint barriers, those checkpoint barriers travel through the Job Graph. The quicker they can do that the better. How fast does it take depends on the amount of buffered data before checkpoint barriers (currently all of such records must be processed before checkpoint barrier is passed down stream). The more buffered records and the more time it takes to process those records, the longer the checkpoint take time. Obviously if one stage in the job is multiplying the amount of records, it can in a way multiply the amount of “buffered work” that needs to be processed before checkpoint barriers pass through.

However it might not be the case for you. To analyse what’s going on you would need to look at various Flink metrics, like checkpoint times, back pressured tasks, state of the output/input buffers of the tasks, etc. However #2, those are secondary issues. First of all you should try to pin point the cause of long GC pauses. If it comes from your code, you should fix this first. If that either isn’t the issue or doesn’t solve it, generally speaking I would advice profiling your job to find out what’s going on.

Piotrek

On 5 Mar 2019, at 02:00, Padarn Wilson <[hidden email]> wrote:

Hi Piotr,

Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).

Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 

Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 

However you might want to first identify what’s causing long GC times.

My current plan is to try and enable GC logs and see if I can get something meaningful from them. 

Thanks a lot,
Padarn 


On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

What Flink version are you using?

Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 

However you might want to first identify what’s causing long GC times. If there are long GC pause, this should be the first thing to fix.

Piotrek

On 2 Mar 2019, at 08:19, Padarn Wilson <[hidden email]> wrote:

Hi all again - following up on this I think I've identified my problem as being something else, but would appreciate if anyone can offer advice.

After running my stream from sometime, I see that my garbage collector for old generation starts to take a very long time:
<Screen Shot 2019-03-02 at 3.01.57 PM.png>
here the purple line is young generation time, this is ever increasing, but grows slowly, while the blue is old generation.
This in itself is not a problem, but as soon as the next checkpoint is triggered after this happens you see the following:
<Screen Shot 2019-03-02 at 3.02.48 PM.png>
It looks like the checkpoint hits a cap, but this is only because the checkpoints start to timeout and fail (these are the alignment time per operator)

I do notice that my state is growing quite larger over time, but I don't have a good understanding of what would cause this to happen with the JVM old generation metric, which appears to be the leading metric before a problem is noticed. Other metrics such as network buffers also show that at the checkpoint time things start to go haywire and the situation never recovers.

Thanks

On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <[hidden email]> wrote:
Hi all,

I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like:

Data: (key1, count, time)

Source -> Map(x -> (x, newKeyList(x.key1))
            -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
            -> Keyby(_.key1).TublingWindow().apply..
            -> Sink

In the Map -> Flatmap, what is happening is that each key is mapping to a set of keys, and then this is set as the new key. This effectively increase the size of the stream by 16x

What I am trying to figure out is how to set the parallelism of my operators. I see in some comments that people suggest your source, sink and aggregation should have different parallelism, but I'm not clear on exactly why, or what this means for CPU utilization. 

Also, it isn't clear to me the best way to handle this increase in data within the stream itself.

Thanks