hi, all
I have a batch job , read data from postgreSQL with jdbc . When the record count greater than 100,000, then the flink job occur OutOfMemoryError: GC overhead limit exceeded The TaskManager memory is 16GB -yjm 5120 -ytm 16384 my config is . anybody can help me ? JDBCInputFormat.buildJDBCInputFormat() |
Hi longan, What's more, besides monitoring memory metrics, you can view memory GC information with two config options provided by Flink itself.[1] Best, Vino 淘宝龙安 <[hidden email]> 于2019年11月17日周日 下午1:03写道:
|
hi, yanghua Thanks for your response , my scenario is very simple. I have two table in database. table A (user_info) -------------------------------- id | varchar | name | varchar | age | numeric | -------------------------------- Table B (order_info) ------------------------------- id | varchar | user_id | varchar | price | numeric | ------------------------------ I read these two table into flink by JDBC. Then i join them. my code is just like this . EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); // register table user_info with JDBCInputFormat fsTableEnv.registerTableSource("user_info", new JdbcTableSource( fieldNames, types, JDBCInputFormat.buildJDBCInputFormat() .setDBUrl(configDO.getJDBCUrl()) .setUsername(configDO.getDbUser()) .setPassword(configDO.getDbPassword()) .setFetchSize(10000) .setDrivername(configDO.getJdbcDriver()) .setRowTypeInfo(new RowTypeInfo(types, fieldNames)) .setQuery("select id, name, age from user_info") .finish() )); // register table order_info with JDBCInputFormat fsTableEnv.registerTableSource("user_info", new JdbcTableSource( fieldNames,types, JDBCInputFormat.buildJDBCInputFormat() .setDBUrl(configDO.getJDBCUrl()) .setUsername(configDO.getDbUser()) .setPassword(configDO.getDbPassword()) .setFetchSize(10000) .setDrivername(configDO.getJdbcDriver()) .setRowTypeInfo(new RowTypeInfo(types, fieldNames)) .setQuery("select id, name, age from order_info") .finish() )); //register a elasticsearch sink fsTableEnv.registerTableSink("output_table", new ElasticSearchTableSink.ElasticSearchTableSinkBuilder() .hosts(configDO.getElasticSearchServer()) .index(ElasticsearchConfigUtils.getIndex(configDO, dateTime)) .docType("_doc") .fieldNames(DatabaseTypesUtils.getFieldNames(configDO.getOutput())) .fieldTypes(DatabaseTypesUtils.getTableTypes(configDO.getOutput())) .build()); // then join these two table fsTableEnv.sqlUpdate("insert into output_table select user_info.id as user_id, order_info.id as order_id, user_info.name, order_info.price from user_info join order_info on order_info.user_id = user_info.id "); fsEnv.execute(taskName); Then i run it on yarn cluster. /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 4 -ys 4-yn 4 -yjm 5120 -ytm 16384 my-flink-job.jar vino yang <[hidden email]> 于2019年11月18日周一 上午11:47写道:
|
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data4/HDATA/yarn/local/usercache/flink/appcache/application_1573355697642_32093/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/data1/app/hadoop-2.7.3-snappy-32core12disk/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Exception in thread "I/O dispatcher 3" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.HashMap$KeySet.iterator(HashMap.java:917) at java.util.HashSet.iterator(HashSet.java:173) at java.util.Collections$UnmodifiableCollection$1.<init>(Collections.java:1039) at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:748) Exception in thread "I/O dispatcher 28" Exception in thread "I/O dispatcher 32" java.lang.OutOfMemoryError: GC overhead limit exceeded Exception in thread "I/O dispatcher 24" java.lang.OutOfMemoryError: GC overhead limit exceeded Exception in thread "I/O dispatcher 26" java.lang.OutOfMemoryError: GC overhead limit exceeded Exception in thread "I/O dispatcher 12" Exception in thread "I/O dispatcher 17" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:748) Exception in thread "I/O dispatcher 18" java.lang.OutOfMemoryError: GC overhead limit exceeded java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:748) java.lang.OutOfMemoryError: GC overhead limit exceeded 淘宝龙安 <[hidden email]> 于2019年11月19日周二 上午11:04写道:
|
Hi 龙安, Firstly, I have two questions. 1) You said this is a batch job, while you used stream env and stream table env. 2) From the startup command, I saw the "-yn" config option which is not supported since Flink 1.8+. I guess you only started one TM container(-p/-s=1). If I am wrong, please correct me. Can we firstly align these two questions? Best, Vino 淘宝龙安 <[hidden email]> 于2019年11月19日周二 下午1:32写道:
|
Hi 龙安, Sorry, I did not know you used Blink planner. > Before use NumericBetweenParametersProvider, the job read data from database just use one Task manager even i have more then one TM . About using only one TM, it seems it's a known issue[1]. Best, Vino 淘宝龙安 <[hidden email]> 于2019年11月19日周二 下午9:26写道:
|
Free forum by Nabble | Edit this page |