How to run multiple InputFormat on multiple nodes

Posted by HAN QIU on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-run-multiple-InputFormat-on-multiple-nodes-tp28794.html

Dears, 
    I have installed the Flink standalone cluster with 3 nodes, and I want to select some data from database. For high performance, I split the one sql to many, and I want these many sqls can run on the 3 nodes distributed.But actually these sqls are runing on the same node. Here is my codes.
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);

    for (int i = 0; i < 100; i++) {
        String from = String.valueOf(i*1000);
        String to = String.valueOf((i+1)*1000);
        String sql = "select * from table a where a.id >= " + from + " and a.id <= " + to;
        DataSet<Row> source = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername("org.postgresql.Driver")
                .setDBUrl("jdbc:postgresql://****:5432/test")
                .setUsername("root")
                .setPassword("****")
                .setQuery(sql)
                .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DATE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BIG_DEC_TYPE_INFO))
                .finish()).rebalance();
        DataSet<Tuple2<String, Float>> top = source.flatMap(new MyFlot()).groupBy(0).sum(1).sortPartition(1, Order.DESCENDING).first(10);
        top.print();
    }
    env.execute();
    So how can I run thees sqls distributed so that can make the best use of the flink cluster. There will occur OOM if the read database operation run only on one node😢, Hope someone can help me, Thx!