I wrote a flink-sql app with following topography.
KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink ... KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink I have a dozen of TableSources And tens of SQLs. As a result, the number of JDBCAppendTableSink times parallelism, that is the number of concurrent connections to database, is too large for the database server to handle. So I tried union DataStreams before connecting them to the TableSink. KafkaJsonTableSource -> SQL -> toAppendStream -> Map \ KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> JDBCAppendTableSink ... / KafkaJsonTableSource -> SQL -> toAppendStream -> Map With this strategy, job submission failed with an OversizedPayloadException of 104 MB. Increasing akka.framesize helps to avoid this exception, but job submission hangs and times out. I can't understand why a simple union operator would serialize to such a large message. Can I avoid this problem? Or can I change some configuration to fix the submission time out? Regards, Bill
|
Hi,
Could you provide more details about your queries and setup? Logs could be helpful as well. Piotrek > On 9 Mar 2018, at 11:00, 杨力 <[hidden email]> wrote: > > I wrote a flink-sql app with following topography. > > KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink > KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink > ... > KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink > > I have a dozen of TableSources And tens of SQLs. As a result, the number of JDBCAppendTableSink times parallelism, that is the number of concurrent connections to database, is too large for the database server to handle. So I tried union DataStreams before connecting them to the TableSink. > > KafkaJsonTableSource -> SQL -> toAppendStream -> Map > \ > KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> JDBCAppendTableSink > ... / > KafkaJsonTableSource -> SQL -> toAppendStream -> Map > > With this strategy, job submission failed with an OversizedPayloadException of 104 MB. Increasing akka.framesize helps to avoid this exception, but job submission hangs and times out. > > I can't understand why a simple union operator would serialize to such a large message. Can I avoid this problem? > Or can I change some configuration to fix the submission time out? > > Regards, > Bill |
Thank you for your response. It occurs both in a standalone cluster anda a yarn-cluster. I am trying to remove business code and reproduce it with a minimal demo.
On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <[hidden email]> wrote: Hi, |
Hi Bill, The size of the program depends on the number and complexity SQL queries that you are submitting. Each query might be translated into a sequence of multiple operators. Each operator has a string with generated code that will be compiled on the worker nodes. The size of the code depends on the number of fields in the schema. 2018-03-09 23:36 GMT+01:00 杨力 <[hidden email]>:
|
I understand complex SQL queries would be translated into large DAGs.
However, the submission succeeds in my case if I don't use union operator. It might be a potential bug related to it. For example, following code submisses successfully with the default limitations of akka.framesize. val sqls: Seq[String] = ... val sink: JDBCAppendTableSink = ... sqls foreach { sql => val table = tEnv.sqlQuery(sql) val outputStream = tEnv.toAppendStream[Row](table) map { ... } tEnv.fromDataStream(outputStream).writeToSink(sink) } If I union these outputStreams and send it to a single sink, the size of serialized job will be 100 MB. val outputStream = sqls map { sql => val table = tEnv.sqlQuery(sql) tEnv.toAppendStream[Row](table) map { ... } } reduce { (a, b) => a union b } tEnv.fromDataStream(outputStream).writeToSink(sink) I failed to reproduce it without actually used table schemas and SQL queries in my production. And at last I wrote my own JDBC sink with connection pooling to migrate this problem. Maybe someone familiar with the implementation of union operator would figure out what's going wrong. Fabian Hueske <[hidden email]> 于 2018年3月13日周二 下午11:42写道:
|
Can you share the operator plan (StreamExecutionEnvironment.getExecutionPlan()) for both cases? Thanks, Fabian 2018-03-14 9:08 GMT+01:00 杨力 <[hidden email]>: I understand complex SQL queries would be translated into large DAGs. |
Free forum by Nabble | Edit this page |