Extremely large job serialization produced by union operator

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Extremely large job serialization produced by union operator

杨力
I wrote a flink-sql app with following topography.

KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
...
KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink

I have a dozen of TableSources And tens of SQLs. As a result, the number of JDBCAppendTableSink times parallelism, that is the number of concurrent connections to database, is too large for the database server to handle. So I tried union DataStreams before connecting them to the TableSink.

KafkaJsonTableSource -> SQL -> toAppendStream -> Map
\
KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> JDBCAppendTableSink
... /
KafkaJsonTableSource -> SQL -> toAppendStream -> Map

With this strategy, job submission failed with an OversizedPayloadException of 104 MB. Increasing akka.framesize helps to avoid this exception, but job submission hangs and times out.

I can't understand why a simple union operator would serialize to such a large message. Can I avoid this problem?
Or can I change some configuration to fix the submission time out?

Regards,
Bill
Reply | Threaded
Open this post in threaded view
|

Re: Extremely large job serialization produced by union operator

Piotr Nowojski
Hi,

Could you provide more details about your queries and setup? Logs could be helpful as well.

Piotrek

> On 9 Mar 2018, at 11:00, 杨力 <[hidden email]> wrote:
>
> I wrote a flink-sql app with following topography.
>
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> ...
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
>
> I have a dozen of TableSources And tens of SQLs. As a result, the number of JDBCAppendTableSink times parallelism, that is the number of concurrent connections to database, is too large for the database server to handle. So I tried union DataStreams before connecting them to the TableSink.
>
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> \
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> JDBCAppendTableSink
> ... /
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>
> With this strategy, job submission failed with an OversizedPayloadException of 104 MB. Increasing akka.framesize helps to avoid this exception, but job submission hangs and times out.
>
> I can't understand why a simple union operator would serialize to such a large message. Can I avoid this problem?
> Or can I change some configuration to fix the submission time out?
>
> Regards,
> Bill

Reply | Threaded
Open this post in threaded view
|

Re: Extremely large job serialization produced by union operator

杨力
Thank you for your response. It occurs both in a standalone cluster anda a yarn-cluster. I am trying to remove business code and reproduce it with a minimal demo.

On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

Could you provide more details about your queries and setup? Logs could be helpful as well.

Piotrek

> On 9 Mar 2018, at 11:00, 杨力 <[hidden email]> wrote:
>
> I wrote a flink-sql app with following topography.
>
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> ...
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
>
> I have a dozen of TableSources And tens of SQLs. As a result, the number of JDBCAppendTableSink times parallelism, that is the number of concurrent connections to database, is too large for the database server to handle. So I tried union DataStreams before connecting them to the TableSink.
>
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> \
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> JDBCAppendTableSink
> ... /
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>
> With this strategy, job submission failed with an OversizedPayloadException of 104 MB. Increasing akka.framesize helps to avoid this exception, but job submission hangs and times out.
>
> I can't understand why a simple union operator would serialize to such a large message. Can I avoid this problem?
> Or can I change some configuration to fix the submission time out?
>
> Regards,
> Bill

Reply | Threaded
Open this post in threaded view
|

Re: Extremely large job serialization produced by union operator

Fabian Hueske-2
Hi Bill,

The size of the program depends on the number and complexity SQL queries that you are submitting.
Each query might be translated into a sequence of multiple operators. Each operator has a string with generated code that will be compiled on the worker nodes. The size of the code depends on the number of fields in the schema.
Operators and code are not shared across queries.

Best, Fabian

2018-03-09 23:36 GMT+01:00 杨力 <[hidden email]>:
Thank you for your response. It occurs both in a standalone cluster anda a yarn-cluster. I am trying to remove business code and reproduce it with a minimal demo.


On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

Could you provide more details about your queries and setup? Logs could be helpful as well.

Piotrek

> On 9 Mar 2018, at 11:00, 杨力 <[hidden email]> wrote:
>
> I wrote a flink-sql app with following topography.
>
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> ...
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
>
> I have a dozen of TableSources And tens of SQLs. As a result, the number of JDBCAppendTableSink times parallelism, that is the number of concurrent connections to database, is too large for the database server to handle. So I tried union DataStreams before connecting them to the TableSink.
>
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> \
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> JDBCAppendTableSink
> ... /
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>
> With this strategy, job submission failed with an OversizedPayloadException of 104 MB. Increasing akka.framesize helps to avoid this exception, but job submission hangs and times out.
>
> I can't understand why a simple union operator would serialize to such a large message. Can I avoid this problem?
> Or can I change some configuration to fix the submission time out?
>
> Regards,
> Bill


Reply | Threaded
Open this post in threaded view
|

Re: Extremely large job serialization produced by union operator

杨力
I understand complex SQL queries would be translated into large DAGs.
However, the submission succeeds in my case if I don't use union operator. It might be a potential bug related to it. For example, following code submisses successfully with the default limitations of akka.framesize.

val sqls: Seq[String] = ...
val sink: JDBCAppendTableSink = ...

sqls foreach {
  sql =>
    val table = tEnv.sqlQuery(sql)
    val outputStream = tEnv.toAppendStream[Row](table) map {
      ...
    }
    tEnv.fromDataStream(outputStream).writeToSink(sink)
}

If I union these outputStreams and send it to a single sink, the size of serialized job will be 100 MB.

val outputStream = sqls map {
  sql =>
    val table = tEnv.sqlQuery(sql)
    tEnv.toAppendStream[Row](table) map {
      ...
    }
} reduce {
  (a, b) => a union b
}
tEnv.fromDataStream(outputStream).writeToSink(sink)

I failed to reproduce it without actually used table schemas and SQL queries in my production. And at last I wrote my own JDBC sink with connection pooling to migrate this problem. Maybe someone familiar with the implementation of union operator would figure out what's going wrong.

Fabian Hueske <[hidden email]> 于 2018年3月13日周二 下午11:42写道:
Hi Bill,

The size of the program depends on the number and complexity SQL queries that you are submitting.
Each query might be translated into a sequence of multiple operators. Each operator has a string with generated code that will be compiled on the worker nodes. The size of the code depends on the number of fields in the schema.
Operators and code are not shared across queries.

Best, Fabian

2018-03-09 23:36 GMT+01:00 杨力 <[hidden email]>:
Thank you for your response. It occurs both in a standalone cluster anda a yarn-cluster. I am trying to remove business code and reproduce it with a minimal demo.


On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

Could you provide more details about your queries and setup? Logs could be helpful as well.

Piotrek

> On 9 Mar 2018, at 11:00, 杨力 <[hidden email]> wrote:
>
> I wrote a flink-sql app with following topography.
>
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> ...
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
>
> I have a dozen of TableSources And tens of SQLs. As a result, the number of JDBCAppendTableSink times parallelism, that is the number of concurrent connections to database, is too large for the database server to handle. So I tried union DataStreams before connecting them to the TableSink.
>
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> \
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> JDBCAppendTableSink
> ... /
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>
> With this strategy, job submission failed with an OversizedPayloadException of 104 MB. Increasing akka.framesize helps to avoid this exception, but job submission hangs and times out.
>
> I can't understand why a simple union operator would serialize to such a large message. Can I avoid this problem?
> Or can I change some configuration to fix the submission time out?
>
> Regards,
> Bill


Reply | Threaded
Open this post in threaded view
|

Re: Extremely large job serialization produced by union operator

Fabian Hueske-2
Can you share the operator plan (StreamExecutionEnvironment.getExecutionPlan()) for both cases?

Thanks, Fabian

2018-03-14 9:08 GMT+01:00 杨力 <[hidden email]>:
I understand complex SQL queries would be translated into large DAGs.
However, the submission succeeds in my case if I don't use union operator. It might be a potential bug related to it. For example, following code submisses successfully with the default limitations of akka.framesize.

val sqls: Seq[String] = ...
val sink: JDBCAppendTableSink = ...

sqls foreach {
  sql =>
    val table = tEnv.sqlQuery(sql)
    val outputStream = tEnv.toAppendStream[Row](table) map {
      ...
    }
    tEnv.fromDataStream(outputStream).writeToSink(sink)
}

If I union these outputStreams and send it to a single sink, the size of serialized job will be 100 MB.

val outputStream = sqls map {
  sql =>
    val table = tEnv.sqlQuery(sql)
    tEnv.toAppendStream[Row](table) map {
      ...
    }
} reduce {
  (a, b) => a union b
}
tEnv.fromDataStream(outputStream).writeToSink(sink)

I failed to reproduce it without actually used table schemas and SQL queries in my production. And at last I wrote my own JDBC sink with connection pooling to migrate this problem. Maybe someone familiar with the implementation of union operator would figure out what's going wrong.

Fabian Hueske <[hidden email]> 于 2018年3月13日周二 下午11:42写道:
Hi Bill,

The size of the program depends on the number and complexity SQL queries that you are submitting.
Each query might be translated into a sequence of multiple operators. Each operator has a string with generated code that will be compiled on the worker nodes. The size of the code depends on the number of fields in the schema.
Operators and code are not shared across queries.

Best, Fabian

2018-03-09 23:36 GMT+01:00 杨力 <[hidden email]>:
Thank you for your response. It occurs both in a standalone cluster anda a yarn-cluster. I am trying to remove business code and reproduce it with a minimal demo.


On Sat, Mar 10, 2018 at 2:27 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

Could you provide more details about your queries and setup? Logs could be helpful as well.

Piotrek

> On 9 Mar 2018, at 11:00, 杨力 <[hidden email]> wrote:
>
> I wrote a flink-sql app with following topography.
>
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
> ...
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map -> JDBCAppendTableSink
>
> I have a dozen of TableSources And tens of SQLs. As a result, the number of JDBCAppendTableSink times parallelism, that is the number of concurrent connections to database, is too large for the database server to handle. So I tried union DataStreams before connecting them to the TableSink.
>
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
> \
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map --- union -> JDBCAppendTableSink
> ... /
> KafkaJsonTableSource -> SQL -> toAppendStream -> Map
>
> With this strategy, job submission failed with an OversizedPayloadException of 104 MB. Increasing akka.framesize helps to avoid this exception, but job submission hangs and times out.
>
> I can't understand why a simple union operator would serialize to such a large message. Can I avoid this problem?
> Or can I change some configuration to fix the submission time out?
>
> Regards,
> Bill