Parallelizing a tumbling group window

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Parallelizing a tumbling group window

Colin Williams
Hello,

I've inherited some flink application code.

We're currently creating a table using a Tumbling SQL query similar to the first example in


Where each generated SQL query looks something like

SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)


We are also using a UDFAGG function in some of the queries which I think might be cleaned up and optimized a bit (using scala types and possibly not well implemented)

We then turn the result table back into a datastream using toAppendStream, and eventually add a derivative stream to a sink. We've configured TimeCharacteristic to event-time processing.

In some streaming scenarios everything is working fine with a parallelism of 1, but in others it appears that we can't keep up with the event source.

Then we are investigating how to enable parallelism specifically on the SQL table query or aggregator.

Can anyone suggest a good way to go about this? It wasn't clear from the documentation.

Best,

Colin Williams



Reply | Threaded
Open this post in threaded view
|

Re: Parallelizing a tumbling group window

Timo Walther
Hi Colin,

unfortunately, selecting the parallelism for parts of a SQL query is not supported yet. By default, tumbling window operators use the default parallelism of the environment. Simple project and select operations have the same parallelism as the inputs they are applied on.

I think the easiest solution so far is to explicilty set the parallelism of operators that are not part of the Table API and use the environment's parallelism to scale the SQL query.

I hope that helps.

Regards,
Timo


Am 12/9/17 um 3:06 AM schrieb Colin Williams:
Hello,

I've inherited some flink application code.

We're currently creating a table using a Tumbling SQL query similar to the first example in


Where each generated SQL query looks something like

SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)


We are also using a UDFAGG function in some of the queries which I think might be cleaned up and optimized a bit (using scala types and possibly not well implemented)

We then turn the result table back into a datastream using toAppendStream, and eventually add a derivative stream to a sink. We've configured TimeCharacteristic to event-time processing.

In some streaming scenarios everything is working fine with a parallelism of 1, but in others it appears that we can't keep up with the event source.

Then we are investigating how to enable parallelism specifically on the SQL table query or aggregator.

Can anyone suggest a good way to go about this? It wasn't clear from the documentation.

Best,

Colin Williams




Reply | Threaded
Open this post in threaded view
|

Re: Parallelizing a tumbling group window

Colin Williams
Hi Timo and flink-user,


It's been a few weeks and we've made some changes to the application mentioned on this email. we've also updated for flink 1.4 . We are using the SQL / Table API with a tumbling window and user defined agg to generate a SQL query string like:


SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).




I've experimented with parallelism of the operators and setting the environments parallelism as suggested. I've been setting parallelism values of 2 or 4 to all operators except the consumer and sink.


For some jobs with large kafka source topics, under load we experience back pressure and see some lag. But when trying to address via parallelism: so far I've only seen very degraded performance from the increased parallelism settings.


Furthermore, the suspect jobs are grouping by a field of constant values. Then these jobs usually have 40,000 or so grouped records enter the aggregator for each minute window.



I would think that the tumbling windows would allow the job to process each window in another task slot, parallelizing each window. But maybe that's not happening?



Can you help us to understand why parallelizing the job only has a degraded impact on performance and what I can do to change this?




Happy New Year!



Colin Williams










On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <[hidden email]> wrote:
Hi Colin,

unfortunately, selecting the parallelism for parts of a SQL query is not supported yet. By default, tumbling window operators use the default parallelism of the environment. Simple project and select operations have the same parallelism as the inputs they are applied on.

I think the easiest solution so far is to explicilty set the parallelism of operators that are not part of the Table API and use the environment's parallelism to scale the SQL query.

I hope that helps.

Regards,
Timo


Am 12/9/17 um 3:06 AM schrieb Colin Williams:
Hello,

I've inherited some flink application code.

We're currently creating a table using a Tumbling SQL query similar to the first example in


Where each generated SQL query looks something like

SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)


We are also using a UDFAGG function in some of the queries which I think might be cleaned up and optimized a bit (using scala types and possibly not well implemented)

We then turn the result table back into a datastream using toAppendStream, and eventually add a derivative stream to a sink. We've configured TimeCharacteristic to event-time processing.

In some streaming scenarios everything is working fine with a parallelism of 1, but in others it appears that we can't keep up with the event source.

Then we are investigating how to enable parallelism specifically on the SQL table query or aggregator.

Can anyone suggest a good way to go about this? It wasn't clear from the documentation.

Best,

Colin Williams





Reply | Threaded
Open this post in threaded view
|

Re: Parallelizing a tumbling group window

Fabian Hueske-2
Hi Colin,

There are two things that come to my mind:

1) You mentioned "suspect jobs are grouping by a field of constant values". Does that mean that the grouping key is always constant? Flink parallelizes the window computation per key, i.e., there is one thread per key. Although it would be possible to perform pre-aggregations, this is not done yet. There is an effort to add support for this to the DataStream API [1]. The Table API will hopefully leverage this once it has been added to the DataStream API.
2) Another reason for backpressure can be non-aligned watermarks, i.e., the watermarks of different partitions diverge too much from each other. In this case, windows cannot be finalized because everything is aligned to the lowest watermark.

Hope this helps to clarify things.

Best, Fabian

2017-12-30 0:11 GMT+01:00 Colin Williams <[hidden email]>:
Hi Timo and flink-user,


It's been a few weeks and we've made some changes to the application mentioned on this email. we've also updated for flink 1.4 . We are using the SQL / Table API with a tumbling window and user defined agg to generate a SQL query string like:


SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).




I've experimented with parallelism of the operators and setting the environments parallelism as suggested. I've been setting parallelism values of 2 or 4 to all operators except the consumer and sink.


For some jobs with large kafka source topics, under load we experience back pressure and see some lag. But when trying to address via parallelism: so far I've only seen very degraded performance from the increased parallelism settings.


Furthermore, the suspect jobs are grouping by a field of constant values. Then these jobs usually have 40,000 or so grouped records enter the aggregator for each minute window.



I would think that the tumbling windows would allow the job to process each window in another task slot, parallelizing each window. But maybe that's not happening?



Can you help us to understand why parallelizing the job only has a degraded impact on performance and what I can do to change this?




Happy New Year!



Colin Williams










On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <[hidden email]> wrote:
Hi Colin,

unfortunately, selecting the parallelism for parts of a SQL query is not supported yet. By default, tumbling window operators use the default parallelism of the environment. Simple project and select operations have the same parallelism as the inputs they are applied on.

I think the easiest solution so far is to explicilty set the parallelism of operators that are not part of the Table API and use the environment's parallelism to scale the SQL query.

I hope that helps.

Regards,
Timo


Am 12/9/17 um 3:06 AM schrieb Colin Williams:
Hello,

I've inherited some flink application code.

We're currently creating a table using a Tumbling SQL query similar to the first example in


Where each generated SQL query looks something like

SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)


We are also using a UDFAGG function in some of the queries which I think might be cleaned up and optimized a bit (using scala types and possibly not well implemented)

We then turn the result table back into a datastream using toAppendStream, and eventually add a derivative stream to a sink. We've configured TimeCharacteristic to event-time processing.

In some streaming scenarios everything is working fine with a parallelism of 1, but in others it appears that we can't keep up with the event source.

Then we are investigating how to enable parallelism specifically on the SQL table query or aggregator.

Can anyone suggest a good way to go about this? It wasn't clear from the documentation.

Best,

Colin Williams






Reply | Threaded
Open this post in threaded view
|

Re: Parallelizing a tumbling group window

Colin Williams
Thanks for the reply. Unfortunately that project was unexpectedly cancelled but for other reasons. I was happy to work on it, and hopefully gained some insight. I have another question today unrelated towards Elasticsearch sinks, and will ask there.

On Fri, Jan 5, 2018 at 2:52 AM, Fabian Hueske <[hidden email]> wrote:
Hi Colin,

There are two things that come to my mind:

1) You mentioned "suspect jobs are grouping by a field of constant values". Does that mean that the grouping key is always constant? Flink parallelizes the window computation per key, i.e., there is one thread per key. Although it would be possible to perform pre-aggregations, this is not done yet. There is an effort to add support for this to the DataStream API [1]. The Table API will hopefully leverage this once it has been added to the DataStream API.
2) Another reason for backpressure can be non-aligned watermarks, i.e., the watermarks of different partitions diverge too much from each other. In this case, windows cannot be finalized because everything is aligned to the lowest watermark.

Hope this helps to clarify things.

Best, Fabian

2017-12-30 0:11 GMT+01:00 Colin Williams <[hidden email]>:
Hi Timo and flink-user,


It's been a few weeks and we've made some changes to the application mentioned on this email. we've also updated for flink 1.4 . We are using the SQL / Table API with a tumbling window and user defined agg to generate a SQL query string like:


SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).




I've experimented with parallelism of the operators and setting the environments parallelism as suggested. I've been setting parallelism values of 2 or 4 to all operators except the consumer and sink.


For some jobs with large kafka source topics, under load we experience back pressure and see some lag. But when trying to address via parallelism: so far I've only seen very degraded performance from the increased parallelism settings.


Furthermore, the suspect jobs are grouping by a field of constant values. Then these jobs usually have 40,000 or so grouped records enter the aggregator for each minute window.



I would think that the tumbling windows would allow the job to process each window in another task slot, parallelizing each window. But maybe that's not happening?



Can you help us to understand why parallelizing the job only has a degraded impact on performance and what I can do to change this?




Happy New Year!



Colin Williams










On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <[hidden email]> wrote:
Hi Colin,

unfortunately, selecting the parallelism for parts of a SQL query is not supported yet. By default, tumbling window operators use the default parallelism of the environment. Simple project and select operations have the same parallelism as the inputs they are applied on.

I think the easiest solution so far is to explicilty set the parallelism of operators that are not part of the Table API and use the environment's parallelism to scale the SQL query.

I hope that helps.

Regards,
Timo


Am 12/9/17 um 3:06 AM schrieb Colin Williams:
Hello,

I've inherited some flink application code.

We're currently creating a table using a Tumbling SQL query similar to the first example in


Where each generated SQL query looks something like

SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)


We are also using a UDFAGG function in some of the queries which I think might be cleaned up and optimized a bit (using scala types and possibly not well implemented)

We then turn the result table back into a datastream using toAppendStream, and eventually add a derivative stream to a sink. We've configured TimeCharacteristic to event-time processing.

In some streaming scenarios everything is working fine with a parallelism of 1, but in others it appears that we can't keep up with the event source.

Then we are investigating how to enable parallelism specifically on the SQL table query or aggregator.

Can anyone suggest a good way to go about this? It wasn't clear from the documentation.

Best,

Colin Williams