flink OutOfMemoryError: GC overhead limit execeeded

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink OutOfMemoryError: GC overhead limit execeeded

淘宝龙安
hi, all

  I have a batch job , read data from postgreSQL with jdbc . When the record count greater than 100,000, then the flink job occur OutOfMemoryError: GC overhead limit exceeded

The TaskManager memory is 16GB

-yjm 5120 -ytm 16384

image.png


my config is .  anybody can help me ?
JDBCInputFormat.buildJDBCInputFormat()
.setDBUrl(configDO.getJDBCUrl())
.setUsername(configDO.getDbUser())
.setPassword(configDO.getDbPassword())
.setFetchSize(JobConfig.jdbcFetchSize)
.setDrivername(configDO.getJdbcDriver())
.setRowTypeInfo(new RowTypeInfo(types, fieldNames))
.setQuery(sql)
.finish()
Reply | Threaded
Open this post in threaded view
|

Re: flink OutOfMemoryError: GC overhead limit execeeded

vino yang
Hi longan,

Preliminary evaluation, only 10w+ records may not cause OOM. Can you give more details about your job e.g. job graph or business logic(how many and what kinds of operators do you use?), how many TM containers? log files and so on.

What's more, besides monitoring memory metrics, you can view memory GC information with two config options provided by Flink itself.[1]

Best,
Vino


淘宝龙安 <[hidden email]> 于2019年11月17日周日 下午1:03写道:
hi, all

  I have a batch job , read data from postgreSQL with jdbc . When the record count greater than 100,000, then the flink job occur OutOfMemoryError: GC overhead limit exceeded

The TaskManager memory is 16GB

-yjm 5120 -ytm 16384

image.png


my config is .  anybody can help me ?
JDBCInputFormat.buildJDBCInputFormat()
.setDBUrl(configDO.getJDBCUrl())
.setUsername(configDO.getDbUser())
.setPassword(configDO.getDbPassword())
.setFetchSize(JobConfig.jdbcFetchSize)
.setDrivername(configDO.getJdbcDriver())
.setRowTypeInfo(new RowTypeInfo(types, fieldNames))
.setQuery(sql)
.finish()
Reply | Threaded
Open this post in threaded view
|

Re: flink OutOfMemoryError: GC overhead limit execeeded

淘宝龙安
hi, yanghua

Thanks for your response , my scenario is very simple. 

I have two table in database. 

table A (user_info)
--------------------------------
id       |   varchar    | 
name |  varchar     |
age    |  numeric    |
--------------------------------



Table B (order_info)
-------------------------------
id           |   varchar    | 
user_id  |   varchar    |
price      |   numeric   |
------------------------------




I read these two table into flink by JDBC.   Then i join them.  my code is just like this .

          EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
         StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
          StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
 
        // register table user_info with  JDBCInputFormat
        fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
                fieldNames,
                types,
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(configDO.getJDBCUrl())
                        .setUsername(configDO.getDbUser())
                        .setPassword(configDO.getDbPassword())
                        .setFetchSize(10000)
                        .setDrivername(configDO.getJdbcDriver())
                        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
                        .setQuery("select id, name, age from user_info")
                        .finish()
        ));

   // register table order_info with JDBCInputFormat 
     fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
                fieldNames,
                types,
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(configDO.getJDBCUrl())
                        .setUsername(configDO.getDbUser())
                        .setPassword(configDO.getDbPassword())
                        .setFetchSize(10000)
                        .setDrivername(configDO.getJdbcDriver())
                        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
                        .setQuery("select id, name, age from order_info")
                        .finish()
        ));


 //register a elasticsearch sink 
  fsTableEnv.registerTableSink("output_table",
                new ElasticSearchTableSink.ElasticSearchTableSinkBuilder()
                        .hosts(configDO.getElasticSearchServer())
                        .index(ElasticsearchConfigUtils.getIndex(configDO, dateTime))
                        .docType("_doc")
                        .fieldNames(DatabaseTypesUtils.getFieldNames(configDO.getOutput()))
                        .fieldTypes(DatabaseTypesUtils.getTableTypes(configDO.getOutput()))
                        .build());

 // then join these two table 
        fsTableEnv.sqlUpdate("insert into output_table select user_info.id as user_id, order_info.id as order_id, user_info.name, order_info.price from user_info join order_info on order_info.user_id = user_info.id ");
        fsEnv.execute(taskName);
 
 

Then i run it on yarn cluster. 

 /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 4 -ys 4-yn 4 -yjm 5120 -ytm 16384  my-flink-job.jar 

















vino yang <[hidden email]> 于2019年11月18日周一 上午11:47写道:
Hi longan,

Preliminary evaluation, only 10w+ records may not cause OOM. Can you give more details about your job e.g. job graph or business logic(how many and what kinds of operators do you use?), how many TM containers? log files and so on.

What's more, besides monitoring memory metrics, you can view memory GC information with two config options provided by Flink itself.[1]

Best,
Vino


淘宝龙安 <[hidden email]> 于2019年11月17日周日 下午1:03写道:
hi, all

  I have a batch job , read data from postgreSQL with jdbc . When the record count greater than 100,000, then the flink job occur OutOfMemoryError: GC overhead limit exceeded

The TaskManager memory is 16GB

-yjm 5120 -ytm 16384

image.png


my config is .  anybody can help me ?
JDBCInputFormat.buildJDBCInputFormat()
.setDBUrl(configDO.getJDBCUrl())
.setUsername(configDO.getDbUser())
.setPassword(configDO.getDbPassword())
.setFetchSize(JobConfig.jdbcFetchSize)
.setDrivername(configDO.getJdbcDriver())
.setRowTypeInfo(new RowTypeInfo(types, fieldNames))
.setQuery(sql)
.finish()
Reply | Threaded
Open this post in threaded view
|

Re: flink OutOfMemoryError: GC overhead limit execeeded

淘宝龙安
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data4/HDATA/yarn/local/usercache/flink/appcache/application_1573355697642_32093/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data1/app/hadoop-2.7.3-snappy-32core12disk/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "I/O dispatcher 3" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.HashMap$KeySet.iterator(HashMap.java:917)
at java.util.HashSet.iterator(HashSet.java:173)
at java.util.Collections$UnmodifiableCollection$1.<init>(Collections.java:1039)
at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "I/O dispatcher 28" Exception in thread "I/O dispatcher 32" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "I/O dispatcher 24" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "I/O dispatcher 26" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "I/O dispatcher 12" Exception in thread "I/O dispatcher 17" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "I/O dispatcher 18" java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
java.lang.OutOfMemoryError: GC overhead limit exceeded

淘宝龙安 <[hidden email]> 于2019年11月19日周二 上午11:04写道:
hi, yanghua

Thanks for your response , my scenario is very simple. 

I have two table in database. 

table A (user_info)
--------------------------------
id       |   varchar    | 
name |  varchar     |
age    |  numeric    |
--------------------------------



Table B (order_info)
-------------------------------
id           |   varchar    | 
user_id  |   varchar    |
price      |   numeric   |
------------------------------




I read these two table into flink by JDBC.   Then i join them.  my code is just like this .

          EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
         StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
          StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
 
        // register table user_info with  JDBCInputFormat
        fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
                fieldNames,
                types,
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(configDO.getJDBCUrl())
                        .setUsername(configDO.getDbUser())
                        .setPassword(configDO.getDbPassword())
                        .setFetchSize(10000)
                        .setDrivername(configDO.getJdbcDriver())
                        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
                        .setQuery("select id, name, age from user_info")
                        .finish()
        ));

   // register table order_info with JDBCInputFormat 
     fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
                fieldNames,
                types,
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(configDO.getJDBCUrl())
                        .setUsername(configDO.getDbUser())
                        .setPassword(configDO.getDbPassword())
                        .setFetchSize(10000)
                        .setDrivername(configDO.getJdbcDriver())
                        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
                        .setQuery("select id, name, age from order_info")
                        .finish()
        ));


 //register a elasticsearch sink 
  fsTableEnv.registerTableSink("output_table",
                new ElasticSearchTableSink.ElasticSearchTableSinkBuilder()
                        .hosts(configDO.getElasticSearchServer())
                        .index(ElasticsearchConfigUtils.getIndex(configDO, dateTime))
                        .docType("_doc")
                        .fieldNames(DatabaseTypesUtils.getFieldNames(configDO.getOutput()))
                        .fieldTypes(DatabaseTypesUtils.getTableTypes(configDO.getOutput()))
                        .build());

 // then join these two table 
        fsTableEnv.sqlUpdate("insert into output_table select user_info.id as user_id, order_info.id as order_id, user_info.name, order_info.price from user_info join order_info on order_info.user_id = user_info.id ");
        fsEnv.execute(taskName);
 
 

Then i run it on yarn cluster. 

 /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 4 -ys 4-yn 4 -yjm 5120 -ytm 16384  my-flink-job.jar 

















vino yang <[hidden email]> 于2019年11月18日周一 上午11:47写道:
Hi longan,

Preliminary evaluation, only 10w+ records may not cause OOM. Can you give more details about your job e.g. job graph or business logic(how many and what kinds of operators do you use?), how many TM containers? log files and so on.

What's more, besides monitoring memory metrics, you can view memory GC information with two config options provided by Flink itself.[1]

Best,
Vino


淘宝龙安 <[hidden email]> 于2019年11月17日周日 下午1:03写道:
hi, all

  I have a batch job , read data from postgreSQL with jdbc . When the record count greater than 100,000, then the flink job occur OutOfMemoryError: GC overhead limit exceeded

The TaskManager memory is 16GB

-yjm 5120 -ytm 16384

image.png


my config is .  anybody can help me ?
JDBCInputFormat.buildJDBCInputFormat()
.setDBUrl(configDO.getJDBCUrl())
.setUsername(configDO.getDbUser())
.setPassword(configDO.getDbPassword())
.setFetchSize(JobConfig.jdbcFetchSize)
.setDrivername(configDO.getJdbcDriver())
.setRowTypeInfo(new RowTypeInfo(types, fieldNames))
.setQuery(sql)
.finish()
Reply | Threaded
Open this post in threaded view
|

Re: flink OutOfMemoryError: GC overhead limit execeeded

vino yang
Hi 龙安,

Firstly, I have two questions.

1) You said this is a batch job, while you used stream env and stream table env.

2) From the startup command, I saw the "-yn" config option which is not supported since Flink 1.8+. I guess you only started one TM container(-p/-s=1). If I am wrong, please correct me.

Can we firstly align these two questions?

Best,
Vino


淘宝龙安 <[hidden email]> 于2019年11月19日周二 下午1:32写道:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data4/HDATA/yarn/local/usercache/flink/appcache/application_1573355697642_32093/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data1/app/hadoop-2.7.3-snappy-32core12disk/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "I/O dispatcher 3" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.HashMap$KeySet.iterator(HashMap.java:917)
at java.util.HashSet.iterator(HashSet.java:173)
at java.util.Collections$UnmodifiableCollection$1.<init>(Collections.java:1039)
at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "I/O dispatcher 28" Exception in thread "I/O dispatcher 32" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "I/O dispatcher 24" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "I/O dispatcher 26" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "I/O dispatcher 12" Exception in thread "I/O dispatcher 17" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "I/O dispatcher 18" java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
java.lang.OutOfMemoryError: GC overhead limit exceeded

淘宝龙安 <[hidden email]> 于2019年11月19日周二 上午11:04写道:
hi, yanghua

Thanks for your response , my scenario is very simple. 

I have two table in database. 

table A (user_info)
--------------------------------
id       |   varchar    | 
name |  varchar     |
age    |  numeric    |
--------------------------------



Table B (order_info)
-------------------------------
id           |   varchar    | 
user_id  |   varchar    |
price      |   numeric   |
------------------------------




I read these two table into flink by JDBC.   Then i join them.  my code is just like this .

          EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
         StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
          StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
 
        // register table user_info with  JDBCInputFormat
        fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
                fieldNames,
                types,
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(configDO.getJDBCUrl())
                        .setUsername(configDO.getDbUser())
                        .setPassword(configDO.getDbPassword())
                        .setFetchSize(10000)
                        .setDrivername(configDO.getJdbcDriver())
                        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
                        .setQuery("select id, name, age from user_info")
                        .finish()
        ));

   // register table order_info with JDBCInputFormat 
     fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
                fieldNames,
                types,
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(configDO.getJDBCUrl())
                        .setUsername(configDO.getDbUser())
                        .setPassword(configDO.getDbPassword())
                        .setFetchSize(10000)
                        .setDrivername(configDO.getJdbcDriver())
                        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
                        .setQuery("select id, name, age from order_info")
                        .finish()
        ));


 //register a elasticsearch sink 
  fsTableEnv.registerTableSink("output_table",
                new ElasticSearchTableSink.ElasticSearchTableSinkBuilder()
                        .hosts(configDO.getElasticSearchServer())
                        .index(ElasticsearchConfigUtils.getIndex(configDO, dateTime))
                        .docType("_doc")
                        .fieldNames(DatabaseTypesUtils.getFieldNames(configDO.getOutput()))
                        .fieldTypes(DatabaseTypesUtils.getTableTypes(configDO.getOutput()))
                        .build());

 // then join these two table 
        fsTableEnv.sqlUpdate("insert into output_table select user_info.id as user_id, order_info.id as order_id, user_info.name, order_info.price from user_info join order_info on order_info.user_id = user_info.id ");
        fsEnv.execute(taskName);
 
 

Then i run it on yarn cluster. 

 /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 4 -ys 4-yn 4 -yjm 5120 -ytm 16384  my-flink-job.jar 

















vino yang <[hidden email]> 于2019年11月18日周一 上午11:47写道:
Hi longan,

Preliminary evaluation, only 10w+ records may not cause OOM. Can you give more details about your job e.g. job graph or business logic(how many and what kinds of operators do you use?), how many TM containers? log files and so on.

What's more, besides monitoring memory metrics, you can view memory GC information with two config options provided by Flink itself.[1]

Best,
Vino


淘宝龙安 <[hidden email]> 于2019年11月17日周日 下午1:03写道:
hi, all

  I have a batch job , read data from postgreSQL with jdbc . When the record count greater than 100,000, then the flink job occur OutOfMemoryError: GC overhead limit exceeded

The TaskManager memory is 16GB

-yjm 5120 -ytm 16384

image.png


my config is .  anybody can help me ?
JDBCInputFormat.buildJDBCInputFormat()
.setDBUrl(configDO.getJDBCUrl())
.setUsername(configDO.getDbUser())
.setPassword(configDO.getDbPassword())
.setFetchSize(JobConfig.jdbcFetchSize)
.setDrivername(configDO.getJdbcDriver())
.setRowTypeInfo(new RowTypeInfo(types, fieldNames))
.setQuery(sql)
.finish()
Reply | Threaded
Open this post in threaded view
|

Re: flink OutOfMemoryError: GC overhead limit execeeded

vino yang
Hi 龙安,

Sorry, I did not know you used Blink planner.

> Before use NumericBetweenParametersProvider, the job read data from database just use one Task manager even i have more then one TM . 

About using only one TM, it seems it's a known issue[1].

Best,
Vino


淘宝龙安 <[hidden email]> 于2019年11月19日周二 下午9:26写道:
1 )  In flink 1.9.0, I found that  BatchTableSource is Deprecated. Then i found  InputFormatTableSource who's function isBounded decide the job is batch. 


In this document above, it says Blink treats batch jobs as a special case of streaming. 

So I guess  stream env is same as BatchTableEnvironment. And it works fine. I don't know if it is correct or not.

2 )  yes, I made a mistake. When I corrected it, it still doesn't work like the past. my new startup commond is   /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 8 -ys 4  -yjm 5120 -ytm 16384  my-flink-job.jar

And finally I solved this problem by  NumericBetweenParametersProvider

            jdbcBuilder.setParametersProvider(new NumericBetweenParametersProvider(JobConfig.jdbcFetchSize,
                    configDO.getBatchStart(),
                    configDO.getBatchEnd()));

But I don't know why ? 

Before use NumericBetweenParametersProvider, the job read data from database just use one Task manager even i have more then one TM . 


vino yang <[hidden email]> 于2019年11月19日周二 下午4:16写道:
Hi 龙安,

Firstly, I have two questions.

1) You said this is a batch job, while you used stream env and stream table env.

2) From the startup command, I saw the "-yn" config option which is not supported since Flink 1.8+. I guess you only started one TM container(-p/-s=1). If I am wrong, please correct me.

Can we firstly align these two questions?

Best,
Vino


淘宝龙安 <[hidden email]> 于2019年11月19日周二 下午1:32写道:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data4/HDATA/yarn/local/usercache/flink/appcache/application_1573355697642_32093/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data1/app/hadoop-2.7.3-snappy-32core12disk/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "I/O dispatcher 3" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.HashMap$KeySet.iterator(HashMap.java:917)
at java.util.HashSet.iterator(HashSet.java:173)
at java.util.Collections$UnmodifiableCollection$1.<init>(Collections.java:1039)
at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "I/O dispatcher 28" Exception in thread "I/O dispatcher 32" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "I/O dispatcher 24" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "I/O dispatcher 26" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "I/O dispatcher 12" Exception in thread "I/O dispatcher 17" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "I/O dispatcher 18" java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
java.lang.OutOfMemoryError: GC overhead limit exceeded

淘宝龙安 <[hidden email]> 于2019年11月19日周二 上午11:04写道:
hi, yanghua

Thanks for your response , my scenario is very simple. 

I have two table in database. 

table A (user_info)
--------------------------------
id       |   varchar    | 
name |  varchar     |
age    |  numeric    |
--------------------------------



Table B (order_info)
-------------------------------
id           |   varchar    | 
user_id  |   varchar    |
price      |   numeric   |
------------------------------




I read these two table into flink by JDBC.   Then i join them.  my code is just like this .

          EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
         StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
          StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
 
        // register table user_info with  JDBCInputFormat
        fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
                fieldNames,
                types,
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(configDO.getJDBCUrl())
                        .setUsername(configDO.getDbUser())
                        .setPassword(configDO.getDbPassword())
                        .setFetchSize(10000)
                        .setDrivername(configDO.getJdbcDriver())
                        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
                        .setQuery("select id, name, age from user_info")
                        .finish()
        ));

   // register table order_info with JDBCInputFormat 
     fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
                fieldNames,
                types,
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(configDO.getJDBCUrl())
                        .setUsername(configDO.getDbUser())
                        .setPassword(configDO.getDbPassword())
                        .setFetchSize(10000)
                        .setDrivername(configDO.getJdbcDriver())
                        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
                        .setQuery("select id, name, age from order_info")
                        .finish()
        ));


 //register a elasticsearch sink 
  fsTableEnv.registerTableSink("output_table",
                new ElasticSearchTableSink.ElasticSearchTableSinkBuilder()
                        .hosts(configDO.getElasticSearchServer())
                        .index(ElasticsearchConfigUtils.getIndex(configDO, dateTime))
                        .docType("_doc")
                        .fieldNames(DatabaseTypesUtils.getFieldNames(configDO.getOutput()))
                        .fieldTypes(DatabaseTypesUtils.getTableTypes(configDO.getOutput()))
                        .build());

 // then join these two table 
        fsTableEnv.sqlUpdate("insert into output_table select user_info.id as user_id, order_info.id as order_id, user_info.name, order_info.price from user_info join order_info on order_info.user_id = user_info.id ");
        fsEnv.execute(taskName);
 
 

Then i run it on yarn cluster. 

 /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 4 -ys 4-yn 4 -yjm 5120 -ytm 16384  my-flink-job.jar 

















vino yang <[hidden email]> 于2019年11月18日周一 上午11:47写道:
Hi longan,

Preliminary evaluation, only 10w+ records may not cause OOM. Can you give more details about your job e.g. job graph or business logic(how many and what kinds of operators do you use?), how many TM containers? log files and so on.

What's more, besides monitoring memory metrics, you can view memory GC information with two config options provided by Flink itself.[1]

Best,
Vino


淘宝龙安 <[hidden email]> 于2019年11月17日周日 下午1:03写道:
hi, all

  I have a batch job , read data from postgreSQL with jdbc . When the record count greater than 100,000, then the flink job occur OutOfMemoryError: GC overhead limit exceeded

The TaskManager memory is 16GB

-yjm 5120 -ytm 16384

image.png


my config is .  anybody can help me ?
JDBCInputFormat.buildJDBCInputFormat()
.setDBUrl(configDO.getJDBCUrl())
.setUsername(configDO.getDbUser())
.setPassword(configDO.getDbPassword())
.setFetchSize(JobConfig.jdbcFetchSize)
.setDrivername(configDO.getJdbcDriver())
.setRowTypeInfo(new RowTypeInfo(types, fieldNames))
.setQuery(sql)
.finish()