http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-run-multiple-InputFormat-on-multiple-nodes-tp28794.html
Dears,
I have installed the Flink standalone cluster with 3 nodes, and I want to select some data from database. For high performance, I split the one sql to many, and I want these many sqls can run on the 3 nodes distributed.But actually these sqls are runing on the same node. Here is my codes.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
for (int i = 0; i < 100; i++) {
String from = String.valueOf(i*1000);
String to = String.valueOf((i+1)*1000);
String sql = "select * from table a where a.id >= " + from + " and a.id <= " + to;
DataSet<Row> source = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://****:5432/test")
.setUsername("root")
.setPassword("****")
.setQuery(sql)
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DATE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BIG_DEC_TYPE_INFO))
.finish()).rebalance();
DataSet<Tuple2<String, Float>> top = source.flatMap(new MyFlot()).groupBy(0).sum(1).sortPartition(1, Order.DESCENDING).first(10);
top.print();
}
env.execute();
So how can I run thees sqls distributed so that can make the best use of the flink cluster. There will occur OOM if the read database operation run only on one node😢, Hope someone can help me, Thx!