How to run multiple InputFormat on multiple nodes

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to run multiple InputFormat on multiple nodes

HAN QIU
Dears, 
    I have installed the Flink standalone cluster with 3 nodes, and I want to select some data from database. For high performance, I split the one sql to many, and I want these many sqls can run on the 3 nodes distributed.But actually these sqls are runing on the same node. Here is my codes.
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);

    for (int i = 0; i < 100; i++) {
        String from = String.valueOf(i*1000);
        String to = String.valueOf((i+1)*1000);
        String sql = "select * from table a where a.id >= " + from + " and a.id <= " + to;
        DataSet<Row> source = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername("org.postgresql.Driver")
                .setDBUrl("jdbc:postgresql://****:5432/test")
                .setUsername("root")
                .setPassword("****")
                .setQuery(sql)
                .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DATE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BIG_DEC_TYPE_INFO))
                .finish()).rebalance();
        DataSet<Tuple2<String, Float>> top = source.flatMap(new MyFlot()).groupBy(0).sum(1).sortPartition(1, Order.DESCENDING).first(10);
        top.print();
    }
    env.execute();
    So how can I run thees sqls distributed so that can make the best use of the flink cluster. There will occur OOM if the read database operation run only on one node😢, Hope someone can help me, Thx!
Reply | Threaded
Open this post in threaded view
|

Re: How to run multiple InputFormat on multiple nodes

Zhu Zhu
Hi Han, from your description, looks that the 3 source nodes you created are running in the same shared slot, thus in the same node.
So far DataSet API does not support customizing slot sharing group. So we may not enforce the sources to be in different shared groups to avoid that.
But I think the correct way to run JDBCInputFormat in parallel is to use parameterized query template, as documented in JDBCInputFormat.

Serializable[][] queryParameters = new String[2][1];
queryParameters[0] = new String[]{"Kumar"};
queryParameters[1] = new String[]{"Tan Ah Teck"};

JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("select * from books WHERE author = ?")
.setRowTypeInfo(rowTypeInfo)
.setParametersProvider(new GenericParameterValuesProvider(queryParameters))
.finish();

HAN QIU <[hidden email]> 于2019年7月17日周三 上午9:51写道:
Dears, 
    I have installed the Flink standalone cluster with 3 nodes, and I want to select some data from database. For high performance, I split the one sql to many, and I want these many sqls can run on the 3 nodes distributed.But actually these sqls are runing on the same node. Here is my codes.
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);

    for (int i = 0; i < 100; i++) {
        String from = String.valueOf(i*1000);
        String to = String.valueOf((i+1)*1000);
        String sql = "select * from table a where a.id >= " + from + " and a.id <= " + to;
        DataSet<Row> source = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername("org.postgresql.Driver")
                .setDBUrl("jdbc:postgresql://****:5432/test")
                .setUsername("root")
                .setPassword("****")
                .setQuery(sql)
                .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DATE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BIG_DEC_TYPE_INFO))
                .finish()).rebalance();
        DataSet<Tuple2<String, Float>> top = source.flatMap(new MyFlot()).groupBy(0).sum(1).sortPartition(1, Order.DESCENDING).first(10);
        top.print();
    }
    env.execute();
    So how can I run thees sqls distributed so that can make the best use of the flink cluster. There will occur OOM if the read database operation run only on one node😢, Hope someone can help me, Thx!