Hello,
I've inherited some flink application code. We're currently creating a table using a Tumbling SQL query similar to the first example in Where each generated SQL query looks something like SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE) We are also using a UDFAGG function in some of the queries which I think might be cleaned up and optimized a bit (using scala types and possibly not well implemented) We then turn the result table back into a datastream using toAppendStream, and eventually add a derivative stream to a sink. We've configured TimeCharacteristic to event-time processing. In some streaming scenarios everything is working fine with a parallelism of 1, but in others it appears that we can't keep up with the event source. Then we are investigating how to enable parallelism specifically on the SQL table query or aggregator. Can anyone suggest a good way to go about this? It wasn't clear from the documentation. Best, Colin Williams |
Hi Colin,
unfortunately, selecting the parallelism for parts of a SQL query is not supported yet. By default, tumbling window operators use the default parallelism of the environment. Simple project and select operations have the same parallelism as the inputs they are applied on. I think the easiest solution so far is to explicilty set the parallelism of operators that are not part of the Table API and use the environment's parallelism to scale the SQL query. I hope that helps. Regards, Timo Am 12/9/17 um 3:06 AM schrieb Colin Williams:
|
Hi Timo and flink-user, It's been a few weeks and we've made some changes to the application mentioned on this email. we've also updated for flink 1.4 . We are using the SQL / Table API with a tumbling window and user defined agg to generate a SQL query string like: SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE). I've experimented with parallelism of the operators and setting the environments parallelism as suggested. I've been setting parallelism values of 2 or 4 to all operators except the consumer and sink. For some jobs with large kafka source topics, under load we experience back pressure and see some lag. But when trying to address via parallelism: so far I've only seen very degraded performance from the increased parallelism settings. Furthermore, the suspect jobs are grouping by a field of constant values. Then these jobs usually have 40,000 or so grouped records enter the aggregator for each minute window. I would think that the tumbling windows would allow the job to process each window in another task slot, parallelizing each window. But maybe that's not happening? Can you help us to understand why parallelizing the job only has a degraded impact on performance and what I can do to change this? Happy New Year! Colin Williams On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <[hidden email]> wrote:
|
Hi Colin, Hope this helps to clarify things. Best, Fabian 2017-12-30 0:11 GMT+01:00 Colin Williams <[hidden email]>:
|
Thanks for the reply. Unfortunately that project was unexpectedly cancelled but for other reasons. I was happy to work on it, and hopefully gained some insight. I have another question today unrelated towards Elasticsearch sinks, and will ask there. On Fri, Jan 5, 2018 at 2:52 AM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |