Unable to launch job with 100 SQL queries in yarn cluster

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Unable to launch job with 100 SQL queries in yarn cluster

Adrian Hains
Hi,
We are having trouble scaling up Flink to execute a collection of SQL queries on a yarn cluster. Has anyone run this kind of workload on a cluster? Any tips on how to get past this issue?

With a high number of Flink SQL queries (100 instances of the query at the bottom of this message), the Flink command line client fails with a “JobManager did not respond within 600000 ms” on a Yarn cluster. JobManager logs has nothing after the last TaskManager started indicating its hung (creating the ExecutionGraph?). This configuration of queries works as a standalone program locally. I can also successfully launch and process 2 instances of the query in cluster mode.

When attempting 10 query instances in cluster mode, we are able to submit but the job errors out with “Insufficient number of network buffers: required 725, but only 135 available. The total number of network buffers is currently set to 61035 of 32768 bytes each. ”. Though surprisingly with a query count of 1, 15000 is all the network buffers that are needed. So it seems like the network buffer count quickly scales with the number of queries.

Note: Each Row in structStream contains 515 columns (very sparse table, >500 are null for each row) including a column that has the raw message.

In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 5 slots and parallelism of 725 (the number of partitions in our Kafka source).

The query is a simple filter and aggregation:
select count(*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source \n" +
"from structStream \n" +
"where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success'\n" +
"group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source"


The code is included in https://issues.apache.org/jira/browse/FLINK-9166

thanks!
-a

--


Reply | Threaded
Open this post in threaded view
|

Re: Unable to launch job with 100 SQL queries in yarn cluster

Fabian Hueske-2
Hi Adrian,

Thanks reaching out to the community. I don't think that this is an issue with Flink's SQL support. SQL queries are translated into regular streaming (or batch) jobs.

The JM might just be overloaded by too many jobs. Since you are running in a YARN environment, it might make sense to try to start more Flink clusters and distribute the queries to more JMs.
In the upcoming Flink 1.5 release, the scheduling and integration with resource managers will be completely reworked and make a one-cluster-per-job deployment easier to maintain.

I've added some details to FLINK-9166.

Best,
Fabian

2018-04-13 8:59 GMT+02:00 Adrian Hains <[hidden email]>:
Hi,
We are having trouble scaling up Flink to execute a collection of SQL queries on a yarn cluster. Has anyone run this kind of workload on a cluster? Any tips on how to get past this issue?

With a high number of Flink SQL queries (100 instances of the query at the bottom of this message), the Flink command line client fails with a “JobManager did not respond within 600000 ms” on a Yarn cluster. JobManager logs has nothing after the last TaskManager started indicating its hung (creating the ExecutionGraph?). This configuration of queries works as a standalone program locally. I can also successfully launch and process 2 instances of the query in cluster mode.

When attempting 10 query instances in cluster mode, we are able to submit but the job errors out with “Insufficient number of network buffers: required 725, but only 135 available. The total number of network buffers is currently set to 61035 of 32768 bytes each. ”. Though surprisingly with a query count of 1, 15000 is all the network buffers that are needed. So it seems like the network buffer count quickly scales with the number of queries.

Note: Each Row in structStream contains 515 columns (very sparse table, >500 are null for each row) including a column that has the raw message.

In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 5 slots and parallelism of 725 (the number of partitions in our Kafka source).

The query is a simple filter and aggregation:
select count(*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source \n" +
"from structStream \n" +
"where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success'\n" +
"group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source"


The code is included in https://issues.apache.org/jira/browse/FLINK-9166

thanks!
-a

--